1use std::fmt::Debug;
17use std::future::Future;
18use std::marker::PhantomData;
19use std::num::ParseIntError;
20use std::time::Duration;
21
22use bytes::{Bytes, BytesMut};
23use futures::stream::Stream as FuturesStream;
24use proc_macro2::Span;
25use quote::quote;
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use slotmap::{Key, new_key_type};
29use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
30use stageleft::{QuotedWithContext, q, quote_type};
31use syn::parse_quote;
32use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
33
34use crate::compile::ir::{
35 ClusterMembersState, DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource,
36};
37use crate::forward_handle::ForwardRef;
38#[cfg(stageleft_runtime)]
39use crate::forward_handle::{CycleCollection, ForwardHandle};
40use crate::live_collections::boundedness::{Bounded, Unbounded};
41use crate::live_collections::keyed_stream::KeyedStream;
42use crate::live_collections::singleton::Singleton;
43use crate::live_collections::stream::{
44 ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
45};
46#[cfg(stageleft_runtime)]
47use crate::location::dynamic::DynLocation;
48use crate::location::dynamic::{ClusterConsistency, LocationId};
49use crate::location::external_process::{
50 ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
51};
52use crate::nondet::NonDet;
53use crate::properties::manual_proof;
54use crate::staging_util::get_this_crate;
55
56pub mod dynamic;
57
58pub mod external_process;
59pub use external_process::External;
60
61pub mod process;
62pub use process::Process;
63
64pub mod cluster;
65pub use cluster::Cluster;
66
67pub mod member_id;
68pub use member_id::{MemberId, TaglessMemberId};
69
70pub mod tick;
71pub use tick::{Atomic, Tick};
72
73#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
76pub enum MembershipEvent {
77 Joined,
79 Left,
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
89pub enum NetworkHint {
90 Auto,
92 TcpPort(Option<u16>),
97}
98
99pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
100 assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
101}
102
103#[stageleft::export(LocationKey)]
104new_key_type! {
105 pub struct LocationKey;
107}
108
109impl std::fmt::Display for LocationKey {
110 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111 write!(f, "loc{:?}", self.data()) }
113}
114
115impl std::str::FromStr for LocationKey {
118 type Err = Option<ParseIntError>;
119
120 fn from_str(s: &str) -> Result<Self, Self::Err> {
121 let nvn = s.strip_prefix("loc").ok_or(None)?;
122 let (idx, ver) = nvn.split_once("v").ok_or(None)?;
123 let idx: u64 = idx.parse()?;
124 let ver: u64 = ver.parse()?;
125 Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
126 }
127}
128
129impl LocationKey {
130 pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); #[cfg(test)]
136 pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000001)); #[cfg(test)]
140 pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000002)); }
142
143impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
145 type O = LocationKey;
146
147 fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
148 where
149 Self: Sized,
150 {
151 let root = get_this_crate();
152 let n = Key::data(&self).as_ffi();
153 (
154 QuoteTokens {
155 prelude: None,
156 expr: Some(quote! {
157 #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
158 }),
159 },
160 (),
161 )
162 }
163}
164
165#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
167pub enum LocationType {
168 Process,
170 Cluster,
172 External,
174}
175
176pub trait TopLevel<'a>: Location<'a> {}
178
179#[expect(
193 private_bounds,
194 reason = "only internal Hydro code can define location types"
195)]
196pub trait Location<'a>: DynLocation {
197 type Root: Location<'a>;
202
203 type DropConsistency: Location<'a, DropConsistency = Self::DropConsistency>;
205
206 fn root(&self) -> Self::Root;
211
212 fn drop_consistency(&self) -> Self::DropConsistency;
214 fn consistency() -> Option<ClusterConsistency>;
216
217 fn with_consistency_of<L2: Location<'a, DropConsistency = Self::DropConsistency>>(&self) -> L2 {
219 L2::from_drop_consistency(self.drop_consistency())
220 }
221
222 #[doc(hidden)]
223 fn from_drop_consistency(l2: Self::DropConsistency) -> Self;
224
225 fn try_tick(&self) -> Option<Tick<Self>> {
232 if Self::is_top_level() {
233 let id = self.flow_state().borrow_mut().next_clock_id();
234 Some(Tick {
235 id,
236 l: self.clone(),
237 })
238 } else {
239 None
240 }
241 }
242
243 fn id(&self) -> LocationId {
245 DynLocation::dyn_id(self)
246 }
247
248 fn tick(&self) -> Tick<Self> {
274 if let LocationId::Tick(_, _) = self.id() {
275 panic!("cannot create nested ticks");
276 }
277
278 let id = self.flow_state().borrow_mut().next_clock_id();
279 Tick {
280 id,
281 l: self.clone(),
282 }
283 }
284
285 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
310 where
311 Self: TopLevel<'a> + Sized,
312 {
313 Stream::new(
314 self.clone(),
315 HydroNode::Source {
316 source: HydroSource::Spin(),
317 metadata: self.new_node_metadata(Stream::<
318 (),
319 Self,
320 Unbounded,
321 TotalOrder,
322 ExactlyOnce,
323 >::collection_kind()),
324 },
325 )
326 }
327
328 fn source_stream<T, E>(
349 &self,
350 e: impl QuotedWithContext<'a, E, Self>,
351 ) -> Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>
352 where
353 E: FuturesStream<Item = T> + Unpin,
354 Self: TopLevel<'a> + Sized,
355 {
356 let e = e.splice_untyped_ctx(self);
357
358 let target_location = self.drop_consistency();
359 Stream::new(
360 target_location.clone(),
361 HydroNode::Source {
362 source: HydroSource::Stream(e.into()),
363 metadata: target_location.new_node_metadata(Stream::<
364 T,
365 Self::DropConsistency,
366 Unbounded,
367 TotalOrder,
368 ExactlyOnce,
369 >::collection_kind()),
370 },
371 )
372 }
373
374 fn source_iter<T, E>(
396 &self,
397 e: impl QuotedWithContext<'a, E, Self>,
398 ) -> Stream<T, Self::DropConsistency, Bounded, TotalOrder, ExactlyOnce>
399 where
400 E: IntoIterator<Item = T>,
401 Self: Sized,
402 {
403 let e = e.splice_typed_ctx(self);
404
405 let target_location = self.drop_consistency();
406 Stream::new(
407 target_location.clone(),
408 HydroNode::Source {
409 source: HydroSource::Iter(e.into()),
410 metadata: target_location.new_node_metadata(Stream::<
411 T,
412 Self::DropConsistency,
413 Bounded,
414 TotalOrder,
415 ExactlyOnce,
416 >::collection_kind()),
417 },
418 )
419 }
420
421 #[deprecated(note = "use .source_cluster_membership_stream(...) instead")]
422 fn source_cluster_members<C: 'a>(
461 &self,
462 cluster: &Cluster<'a, C>,
463 nondet_start: NonDet,
464 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self::DropConsistency, Unbounded>
465 where
466 Self: TopLevel<'a> + Sized,
467 {
468 self.source_cluster_membership_stream(cluster, nondet_start)
469 }
470
471 fn source_cluster_membership_stream<C: 'a>(
510 &self,
511 cluster: &Cluster<'a, C>,
512 _nondet_start: NonDet,
513 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self::DropConsistency, Unbounded>
514 where
515 Self: TopLevel<'a> + Sized,
516 {
517 let target_consistency = self.drop_consistency();
518 Stream::new(
519 target_consistency.clone(),
520 HydroNode::Source {
521 source: HydroSource::ClusterMembers(cluster.id(), ClusterMembersState::Uninit),
522 metadata: target_consistency.new_node_metadata(Stream::<
523 (TaglessMemberId, MembershipEvent),
524 Self,
525 Unbounded,
526 TotalOrder,
527 ExactlyOnce,
528 >::collection_kind(
529 )),
530 },
531 )
532 .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
533 .into_keyed()
534 }
535
536 fn source_external_bytes<L>(
544 &self,
545 from: &External<L>,
546 ) -> (
547 ExternalBytesPort,
548 Stream<BytesMut, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
549 )
550 where
551 Self: TopLevel<'a> + Sized,
552 {
553 let (port, stream, sink) =
554 self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
555
556 sink.complete(stream.location().source_iter(q!([])));
557
558 (port, stream)
559 }
560
561 #[expect(clippy::type_complexity, reason = "stream markers")]
568 fn source_external_bincode<L, T, O: Ordering, R: Retries>(
569 &self,
570 from: &External<L>,
571 ) -> (
572 ExternalBincodeSink<T, NotMany, O, R>,
573 Stream<T, Self::DropConsistency, Unbounded, O, R>,
574 )
575 where
576 Self: TopLevel<'a> + Sized,
577 T: Serialize + DeserializeOwned,
578 {
579 let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
580 sink.complete(stream.location().source_iter(q!([])));
581
582 (
583 ExternalBincodeSink {
584 process_key: from.key,
585 port_id: port.port_id,
586 _phantom: PhantomData,
587 },
588 stream.weaken_ordering().weaken_retries(),
589 )
590 }
591
592 fn embedded_input<T>(
598 &self,
599 name: impl Into<String>,
600 ) -> Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>
601 where
602 Self: TopLevel<'a> + Sized,
603 {
604 let ident = syn::Ident::new(&name.into(), Span::call_site());
605
606 let target_location = self.drop_consistency();
607 Stream::new(
608 target_location.clone(),
609 HydroNode::Source {
610 source: HydroSource::Embedded(ident),
611 metadata: target_location.new_node_metadata(Stream::<
612 T,
613 Self,
614 Unbounded,
615 TotalOrder,
616 ExactlyOnce,
617 >::collection_kind()),
618 },
619 )
620 }
621
622 fn embedded_singleton_input<T>(
628 &self,
629 name: impl Into<String>,
630 ) -> Singleton<T, Self::DropConsistency, Bounded>
631 where
632 Self: TopLevel<'a> + Sized,
633 {
634 let ident = syn::Ident::new(&name.into(), Span::call_site());
635
636 let target_location = self.drop_consistency();
637 Singleton::new(
638 target_location.clone(),
639 HydroNode::Source {
640 source: HydroSource::EmbeddedSingleton(ident),
641 metadata: target_location
642 .new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
643 },
644 )
645 }
646
647 #[expect(clippy::type_complexity, reason = "stream markers")]
692 fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
693 &self,
694 from: &External<L>,
695 port_hint: NetworkHint,
696 ) -> (
697 ExternalBytesPort<NotMany>,
698 Stream<<Codec as Decoder>::Item, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
699 ForwardHandle<'a, Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>>,
700 )
701 where
702 Self: TopLevel<'a> + Sized,
703 {
704 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
705 let target_consistency = self.drop_consistency();
706
707 let (fwd_ref, to_sink) = target_consistency.forward_ref::<Stream<
708 T,
709 Self::DropConsistency,
710 Unbounded,
711 TotalOrder,
712 ExactlyOnce,
713 >>();
714 let mut flow_state_borrow = self.flow_state().borrow_mut();
715
716 flow_state_borrow.push_root(HydroRoot::SendExternal {
717 to_external_key: from.key,
718 to_port_id: next_external_port_id,
719 to_many: false,
720 unpaired: false,
721 serialize_fn: None,
722 instantiate_fn: DebugInstantiate::Building,
723 input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
724 op_metadata: HydroIrOpMetadata::new(),
725 });
726
727 let raw_stream: Stream<
728 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
729 Self::DropConsistency,
730 Unbounded,
731 TotalOrder,
732 ExactlyOnce,
733 > = Stream::new(
734 target_consistency.clone(),
735 HydroNode::ExternalInput {
736 from_external_key: from.key,
737 from_port_id: next_external_port_id,
738 from_many: false,
739 codec_type: quote_type::<Codec>().into(),
740 port_hint,
741 instantiate_fn: DebugInstantiate::Building,
742 deserialize_fn: None,
743 metadata: target_consistency.new_node_metadata(Stream::<
744 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
745 Self::DropConsistency,
746 Unbounded,
747 TotalOrder,
748 ExactlyOnce,
749 >::collection_kind(
750 )),
751 },
752 );
753
754 (
755 ExternalBytesPort {
756 process_key: from.key,
757 port_id: next_external_port_id,
758 _phantom: PhantomData,
759 },
760 raw_stream.flatten_ordered(),
761 fwd_ref,
762 )
763 }
764
765 #[expect(clippy::type_complexity, reason = "stream markers")]
775 fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
776 &self,
777 from: &External<L>,
778 ) -> (
779 ExternalBincodeBidi<InT, OutT, NotMany>,
780 Stream<InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
781 ForwardHandle<'a, Stream<OutT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>>,
782 )
783 where
784 Self: TopLevel<'a> + Sized,
785 {
786 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
787
788 let target_consistency = self.drop_consistency();
789 let (fwd_ref, to_sink) = target_consistency.forward_ref::<Stream<
790 OutT,
791 Self::DropConsistency,
792 Unbounded,
793 TotalOrder,
794 ExactlyOnce,
795 >>();
796 let mut flow_state_borrow = self.flow_state().borrow_mut();
797
798 let root = get_this_crate();
799
800 let out_t_type = quote_type::<OutT>();
801 let ser_fn: syn::Expr = syn::parse_quote! {
802 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
803 |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
804 )
805 };
806
807 flow_state_borrow.push_root(HydroRoot::SendExternal {
808 to_external_key: from.key,
809 to_port_id: next_external_port_id,
810 to_many: false,
811 unpaired: false,
812 serialize_fn: Some(ser_fn.into()),
813 instantiate_fn: DebugInstantiate::Building,
814 input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
815 op_metadata: HydroIrOpMetadata::new(),
816 });
817
818 let in_t_type = quote_type::<InT>();
819
820 let deser_fn: syn::Expr = syn::parse_quote! {
821 |res| {
822 let b = res.unwrap();
823 #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
824 }
825 };
826
827 let raw_stream: Stream<InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce> =
828 Stream::new(
829 target_consistency.clone(),
830 HydroNode::ExternalInput {
831 from_external_key: from.key,
832 from_port_id: next_external_port_id,
833 from_many: false,
834 codec_type: quote_type::<LengthDelimitedCodec>().into(),
835 port_hint: NetworkHint::Auto,
836 instantiate_fn: DebugInstantiate::Building,
837 deserialize_fn: Some(deser_fn.into()),
838 metadata: target_consistency.new_node_metadata(Stream::<
839 InT,
840 Self::DropConsistency,
841 Unbounded,
842 TotalOrder,
843 ExactlyOnce,
844 >::collection_kind(
845 )),
846 },
847 );
848
849 (
850 ExternalBincodeBidi {
851 process_key: from.key,
852 port_id: next_external_port_id,
853 _phantom: PhantomData,
854 },
855 raw_stream,
856 fwd_ref,
857 )
858 }
859
860 #[expect(clippy::type_complexity, reason = "stream markers")]
872 fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
873 &self,
874 from: &External<L>,
875 port_hint: NetworkHint,
876 ) -> (
877 ExternalBytesPort<Many>,
878 KeyedStream<
879 u64,
880 <Codec as Decoder>::Item,
881 Self::DropConsistency,
882 Unbounded,
883 TotalOrder,
884 ExactlyOnce,
885 >,
886 KeyedStream<
887 u64,
888 MembershipEvent,
889 Self::DropConsistency,
890 Unbounded,
891 TotalOrder,
892 ExactlyOnce,
893 >,
894 ForwardHandle<
895 'a,
896 KeyedStream<u64, T, Self::DropConsistency, Unbounded, NoOrder, ExactlyOnce>,
897 >,
898 )
899 where
900 Self: TopLevel<'a> + Sized,
901 {
902 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
903
904 let target_consistency = self.drop_consistency();
905 let (fwd_ref, to_sink) = target_consistency.forward_ref::<KeyedStream<
906 u64,
907 T,
908 Self::DropConsistency,
909 Unbounded,
910 NoOrder,
911 ExactlyOnce,
912 >>();
913 let mut flow_state_borrow = self.flow_state().borrow_mut();
914
915 flow_state_borrow.push_root(HydroRoot::SendExternal {
916 to_external_key: from.key,
917 to_port_id: next_external_port_id,
918 to_many: true,
919 unpaired: false,
920 serialize_fn: None,
921 instantiate_fn: DebugInstantiate::Building,
922 input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
923 op_metadata: HydroIrOpMetadata::new(),
924 });
925
926 let raw_stream: Stream<
927 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
928 Self::DropConsistency,
929 Unbounded,
930 TotalOrder,
931 ExactlyOnce,
932 > = Stream::new(
933 target_consistency.clone(),
934 HydroNode::ExternalInput {
935 from_external_key: from.key,
936 from_port_id: next_external_port_id,
937 from_many: true,
938 codec_type: quote_type::<Codec>().into(),
939 port_hint,
940 instantiate_fn: DebugInstantiate::Building,
941 deserialize_fn: None,
942 metadata: target_consistency.new_node_metadata(Stream::<
943 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
944 Self::DropConsistency,
945 Unbounded,
946 TotalOrder,
947 ExactlyOnce,
948 >::collection_kind(
949 )),
950 },
951 );
952
953 let membership_stream_ident = syn::Ident::new(
954 &format!(
955 "__hydro_deploy_many_{}_{}_membership",
956 from.key, next_external_port_id
957 ),
958 Span::call_site(),
959 );
960 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
961 let raw_membership_stream: KeyedStream<
962 u64,
963 bool,
964 Self::DropConsistency,
965 Unbounded,
966 TotalOrder,
967 ExactlyOnce,
968 > = KeyedStream::new(
969 target_consistency.clone(),
970 HydroNode::Source {
971 source: HydroSource::Stream(membership_stream_expr.into()),
972 metadata: target_consistency.new_node_metadata(KeyedStream::<
973 u64,
974 bool,
975 Self::DropConsistency,
976 Unbounded,
977 TotalOrder,
978 ExactlyOnce,
979 >::collection_kind(
980 )),
981 },
982 );
983
984 (
985 ExternalBytesPort {
986 process_key: from.key,
987 port_id: next_external_port_id,
988 _phantom: PhantomData,
989 },
990 raw_stream
991 .flatten_ordered() .into_keyed(),
993 raw_membership_stream.map(q!(|join| {
994 if join {
995 MembershipEvent::Joined
996 } else {
997 MembershipEvent::Left
998 }
999 })),
1000 fwd_ref,
1001 )
1002 }
1003
1004 #[expect(clippy::type_complexity, reason = "stream markers")]
1020 fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
1021 &self,
1022 from: &External<L>,
1023 ) -> (
1024 ExternalBincodeBidi<InT, OutT, Many>,
1025 KeyedStream<u64, InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
1026 KeyedStream<
1027 u64,
1028 MembershipEvent,
1029 Self::DropConsistency,
1030 Unbounded,
1031 TotalOrder,
1032 ExactlyOnce,
1033 >,
1034 ForwardHandle<
1035 'a,
1036 KeyedStream<u64, OutT, Self::DropConsistency, Unbounded, NoOrder, ExactlyOnce>,
1037 >,
1038 )
1039 where
1040 Self: TopLevel<'a> + Sized,
1041 {
1042 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
1043
1044 let target_consistency = self.drop_consistency();
1045 let (fwd_ref, to_sink) = target_consistency.forward_ref::<KeyedStream<
1046 u64,
1047 OutT,
1048 Self::DropConsistency,
1049 Unbounded,
1050 NoOrder,
1051 ExactlyOnce,
1052 >>();
1053 let mut flow_state_borrow = self.flow_state().borrow_mut();
1054
1055 let root = get_this_crate();
1056
1057 let out_t_type = quote_type::<OutT>();
1058 let ser_fn: syn::Expr = syn::parse_quote! {
1059 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
1060 |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
1061 )
1062 };
1063
1064 flow_state_borrow.push_root(HydroRoot::SendExternal {
1065 to_external_key: from.key,
1066 to_port_id: next_external_port_id,
1067 to_many: true,
1068 unpaired: false,
1069 serialize_fn: Some(ser_fn.into()),
1070 instantiate_fn: DebugInstantiate::Building,
1071 input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
1072 op_metadata: HydroIrOpMetadata::new(),
1073 });
1074
1075 let in_t_type = quote_type::<InT>();
1076
1077 let deser_fn: syn::Expr = syn::parse_quote! {
1078 |res| {
1079 let (id, b) = res.unwrap();
1080 (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
1081 }
1082 };
1083
1084 let raw_stream: KeyedStream<
1085 u64,
1086 InT,
1087 Self::DropConsistency,
1088 Unbounded,
1089 TotalOrder,
1090 ExactlyOnce,
1091 > = KeyedStream::new(
1092 target_consistency.clone(),
1093 HydroNode::ExternalInput {
1094 from_external_key: from.key,
1095 from_port_id: next_external_port_id,
1096 from_many: true,
1097 codec_type: quote_type::<LengthDelimitedCodec>().into(),
1098 port_hint: NetworkHint::Auto,
1099 instantiate_fn: DebugInstantiate::Building,
1100 deserialize_fn: Some(deser_fn.into()),
1101 metadata: target_consistency.new_node_metadata(KeyedStream::<
1102 u64,
1103 InT,
1104 Self::DropConsistency,
1105 Unbounded,
1106 TotalOrder,
1107 ExactlyOnce,
1108 >::collection_kind(
1109 )),
1110 },
1111 );
1112
1113 let membership_stream_ident = syn::Ident::new(
1114 &format!(
1115 "__hydro_deploy_many_{}_{}_membership",
1116 from.key, next_external_port_id
1117 ),
1118 Span::call_site(),
1119 );
1120 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
1121 let raw_membership_stream: KeyedStream<
1122 u64,
1123 bool,
1124 Self::DropConsistency,
1125 Unbounded,
1126 TotalOrder,
1127 ExactlyOnce,
1128 > = KeyedStream::new(
1129 target_consistency.clone(),
1130 HydroNode::Source {
1131 source: HydroSource::Stream(membership_stream_expr.into()),
1132 metadata: target_consistency.new_node_metadata(KeyedStream::<
1133 u64,
1134 bool,
1135 Self::DropConsistency,
1136 Unbounded,
1137 TotalOrder,
1138 ExactlyOnce,
1139 >::collection_kind(
1140 )),
1141 },
1142 );
1143
1144 (
1145 ExternalBincodeBidi {
1146 process_key: from.key,
1147 port_id: next_external_port_id,
1148 _phantom: PhantomData,
1149 },
1150 raw_stream,
1151 raw_membership_stream.map(q!(|join| {
1152 if join {
1153 MembershipEvent::Joined
1154 } else {
1155 MembershipEvent::Left
1156 }
1157 })),
1158 fwd_ref,
1159 )
1160 }
1161
1162 #[expect(clippy::type_complexity, reason = "stream markers")]
1215 fn sidecar_bidi<InT: 'static, OutT: 'static, F>(
1216 &self,
1217 sidecar: impl QuotedWithContext<'a, F, Self>,
1218 ) -> (
1219 Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
1220 ForwardHandle<'a, Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
1221 )
1222 where
1223 Self: Sized + TopLevel<'a>,
1224 {
1225 let location_key = Location::id(self).key();
1226
1227 let sidecar_id = self.flow_state().borrow_mut().next_sidecar_id();
1228 let (stream_ident, sink_ident) = sidecar_id.idents();
1229
1230 let sidecar_closure: syn::Expr = sidecar.splice_untyped_ctx(self);
1231 self.flow_state()
1232 .borrow_mut()
1233 .sidecars
1234 .push(crate::compile::builder::Sidecar::Bidi {
1235 location_key,
1236 sidecar_id,
1237 sidecar_closure: Box::new(sidecar_closure),
1238 });
1239
1240 let source_expr: syn::Expr = parse_quote! {
1242 #stream_ident
1243 };
1244 let inbound: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
1245 self.clone(),
1246 HydroNode::Source {
1247 source: HydroSource::Stream(source_expr.into()),
1248 metadata: self.new_node_metadata(Stream::<
1249 InT,
1250 Self,
1251 Unbounded, TotalOrder, ExactlyOnce,
1254 >::collection_kind()),
1255 },
1256 );
1257
1258 let (fwd_ref, to_sink): (
1260 ForwardHandle<'a, Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
1261 Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>,
1262 ) = self.forward_ref();
1263
1264 let sink_expr: syn::Expr = parse_quote! {
1265 #sink_ident
1266 };
1267
1268 let sink_input_ir = to_sink.ir_node.replace(HydroNode::Placeholder);
1269 self.flow_state()
1270 .borrow_mut()
1271 .try_push_root(HydroRoot::DestSink {
1272 sink: sink_expr.into(),
1273 input: Box::new(sink_input_ir),
1274 op_metadata: HydroIrOpMetadata::new(),
1275 });
1276
1277 (inbound, fwd_ref)
1278 }
1279
1280 fn singleton<T>(
1300 &self,
1301 e: impl QuotedWithContext<'a, T, Self>,
1302 ) -> Singleton<T, Self::DropConsistency, Bounded>
1303 where
1304 Self: Sized,
1305 {
1306 let e = e.splice_untyped_ctx(self);
1307
1308 let target_location = self.drop_consistency();
1309 Singleton::new(
1310 target_location.clone(),
1311 HydroNode::SingletonSource {
1312 value: e.into(),
1313 first_tick_only: false,
1314 metadata: target_location.new_node_metadata(Singleton::<
1315 T,
1316 Self::DropConsistency,
1317 Bounded,
1318 >::collection_kind()),
1319 },
1320 )
1321 }
1322
1323 fn singleton_future<F>(
1346 &self,
1347 e: impl QuotedWithContext<'a, F, Self>,
1348 ) -> Singleton<F::Output, Self::DropConsistency, Bounded>
1349 where
1350 F: Future,
1351 Self: Sized,
1352 {
1353 self.singleton(e).resolve_future_blocking()
1354 }
1355
1356 fn source_interval(
1365 &self,
1366 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1367 ) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
1368 where
1369 Self: TopLevel<'a> + Sized,
1370 {
1371 self.source_stream(q!(tokio_stream::StreamExt::map(
1372 tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(interval)),
1373 |_| ()
1374 )))
1375 .assert_has_consistency_of_trusted(
1376 manual_proof!(),
1377 )
1378 }
1379
1380 fn source_interval_delayed(
1387 &self,
1388 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1389 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1390 ) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
1391 where
1392 Self: TopLevel<'a> + Sized,
1393 {
1394 self.source_stream(q!(tokio_stream::StreamExt::map(
1395 tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at(
1396 tokio::time::Instant::now() + delay,
1397 interval,
1398 )),
1399 |_| ()
1400 )))
1401 .assert_has_consistency_of_trusted(
1402 manual_proof!(),
1403 )
1404 }
1405
1406 fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1446 where
1447 S: CycleCollection<'a, ForwardRef, Location = Self>,
1448 {
1449 let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
1450 (
1451 ForwardHandle::new(cycle_id, Location::id(self)),
1452 S::create_source(cycle_id, self.clone()),
1453 )
1454 }
1455}
1456
1457#[cfg(feature = "deploy")]
1458#[cfg(test)]
1459mod tests {
1460 use std::collections::HashSet;
1461
1462 use futures::{SinkExt, StreamExt};
1463 use hydro_deploy::Deployment;
1464 use stageleft::q;
1465 use tokio_util::codec::LengthDelimitedCodec;
1466
1467 use crate::compile::builder::FlowBuilder;
1468 use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1469 use crate::location::{Location, NetworkHint};
1470 use crate::nondet::nondet;
1471
1472 #[tokio::test]
1473 async fn top_level_singleton_replay_cardinality() {
1474 let mut deployment = Deployment::new();
1475
1476 let mut flow = FlowBuilder::new();
1477 let node = flow.process::<()>();
1478 let external = flow.external::<()>();
1479
1480 let (in_port, input) =
1481 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1482 let singleton = node.singleton(q!(123));
1483 let tick = node.tick();
1484 let out = input
1485 .batch(&tick, nondet!())
1486 .cross_singleton(singleton.clone().snapshot(&tick, nondet!()))
1487 .cross_singleton(
1488 singleton
1489 .snapshot(&tick, nondet!())
1490 .into_stream()
1491 .count(),
1492 )
1493 .all_ticks()
1494 .send_bincode_external(&external);
1495
1496 let nodes = flow
1497 .with_process(&node, deployment.Localhost())
1498 .with_external(&external, deployment.Localhost())
1499 .deploy(&mut deployment);
1500
1501 deployment.deploy().await.unwrap();
1502
1503 let mut external_in = nodes.connect(in_port).await;
1504 let mut external_out = nodes.connect(out).await;
1505
1506 deployment.start().await.unwrap();
1507
1508 external_in.send(1).await.unwrap();
1509 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1510
1511 external_in.send(2).await.unwrap();
1512 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1513 }
1514
1515 #[tokio::test]
1516 async fn tick_singleton_replay_cardinality() {
1517 let mut deployment = Deployment::new();
1518
1519 let mut flow = FlowBuilder::new();
1520 let node = flow.process::<()>();
1521 let external = flow.external::<()>();
1522
1523 let (in_port, input) =
1524 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1525 let tick = node.tick();
1526 let singleton = tick.singleton(q!(123));
1527 let out = input
1528 .batch(&tick, nondet!())
1529 .cross_singleton(singleton.clone())
1530 .cross_singleton(singleton.into_stream().count())
1531 .all_ticks()
1532 .send_bincode_external(&external);
1533
1534 let nodes = flow
1535 .with_process(&node, deployment.Localhost())
1536 .with_external(&external, deployment.Localhost())
1537 .deploy(&mut deployment);
1538
1539 deployment.deploy().await.unwrap();
1540
1541 let mut external_in = nodes.connect(in_port).await;
1542 let mut external_out = nodes.connect(out).await;
1543
1544 deployment.start().await.unwrap();
1545
1546 external_in.send(1).await.unwrap();
1547 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1548
1549 external_in.send(2).await.unwrap();
1550 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1551 }
1552
1553 #[tokio::test]
1554 async fn external_bytes() {
1555 let mut deployment = Deployment::new();
1556
1557 let mut flow = FlowBuilder::new();
1558 let first_node = flow.process::<()>();
1559 let external = flow.external::<()>();
1560
1561 let (in_port, input) = first_node.source_external_bytes(&external);
1562 let out = input.send_bincode_external(&external);
1563
1564 let nodes = flow
1565 .with_process(&first_node, deployment.Localhost())
1566 .with_external(&external, deployment.Localhost())
1567 .deploy(&mut deployment);
1568
1569 deployment.deploy().await.unwrap();
1570
1571 let mut external_in = nodes.connect(in_port).await.1;
1572 let mut external_out = nodes.connect(out).await;
1573
1574 deployment.start().await.unwrap();
1575
1576 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1577
1578 assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1579 }
1580
1581 #[tokio::test]
1582 async fn multi_external_source() {
1583 let mut deployment = Deployment::new();
1584
1585 let mut flow = FlowBuilder::new();
1586 let first_node = flow.process::<()>();
1587 let external = flow.external::<()>();
1588
1589 let (in_port, input, _membership, complete_sink) =
1590 first_node.bidi_external_many_bincode(&external);
1591 let out = input.entries().send_bincode_external(&external);
1592 complete_sink.complete(
1593 first_node
1594 .source_iter::<(u64, ()), _>(q!([]))
1595 .into_keyed()
1596 .weaken_ordering(),
1597 );
1598
1599 let nodes = flow
1600 .with_process(&first_node, deployment.Localhost())
1601 .with_external(&external, deployment.Localhost())
1602 .deploy(&mut deployment);
1603
1604 deployment.deploy().await.unwrap();
1605
1606 let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1607 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1608 let external_out = nodes.connect(out).await;
1609
1610 deployment.start().await.unwrap();
1611
1612 external_in_1.send(123).await.unwrap();
1613 external_in_2.send(456).await.unwrap();
1614
1615 assert_eq!(
1616 external_out.take(2).collect::<HashSet<_>>().await,
1617 vec![(0, 123), (1, 456)].into_iter().collect()
1618 );
1619 }
1620
1621 #[tokio::test]
1622 async fn second_connection_only_multi_source() {
1623 let mut deployment = Deployment::new();
1624
1625 let mut flow = FlowBuilder::new();
1626 let first_node = flow.process::<()>();
1627 let external = flow.external::<()>();
1628
1629 let (in_port, input, _membership, complete_sink) =
1630 first_node.bidi_external_many_bincode(&external);
1631 let out = input.entries().send_bincode_external(&external);
1632 complete_sink.complete(
1633 first_node
1634 .source_iter::<(u64, ()), _>(q!([]))
1635 .into_keyed()
1636 .weaken_ordering(),
1637 );
1638
1639 let nodes = flow
1640 .with_process(&first_node, deployment.Localhost())
1641 .with_external(&external, deployment.Localhost())
1642 .deploy(&mut deployment);
1643
1644 deployment.deploy().await.unwrap();
1645
1646 let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1648 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1649 let mut external_out = nodes.connect(out).await;
1650
1651 deployment.start().await.unwrap();
1652
1653 external_in_2.send(456).await.unwrap();
1654
1655 assert_eq!(external_out.next().await.unwrap(), (1, 456));
1656 }
1657
1658 #[tokio::test]
1659 async fn multi_external_bytes() {
1660 let mut deployment = Deployment::new();
1661
1662 let mut flow = FlowBuilder::new();
1663 let first_node = flow.process::<()>();
1664 let external = flow.external::<()>();
1665
1666 let (in_port, input, _membership, complete_sink) = first_node
1667 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1668 let out = input.entries().send_bincode_external(&external);
1669 complete_sink.complete(
1670 first_node
1671 .source_iter(q!([]))
1672 .into_keyed()
1673 .weaken_ordering(),
1674 );
1675
1676 let nodes = flow
1677 .with_process(&first_node, deployment.Localhost())
1678 .with_external(&external, deployment.Localhost())
1679 .deploy(&mut deployment);
1680
1681 deployment.deploy().await.unwrap();
1682
1683 let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1684 let mut external_in_2 = nodes.connect(in_port).await.1;
1685 let external_out = nodes.connect(out).await;
1686
1687 deployment.start().await.unwrap();
1688
1689 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1690 external_in_2.send(vec![4, 5].into()).await.unwrap();
1691
1692 assert_eq!(
1693 external_out.take(2).collect::<HashSet<_>>().await,
1694 vec![
1695 (0, (&[1u8, 2, 3] as &[u8]).into()),
1696 (1, (&[4u8, 5] as &[u8]).into())
1697 ]
1698 .into_iter()
1699 .collect()
1700 );
1701 }
1702
1703 #[tokio::test]
1704 async fn single_client_external_bytes() {
1705 let mut deployment = Deployment::new();
1706 let mut flow = FlowBuilder::new();
1707 let first_node = flow.process::<()>();
1708 let external = flow.external::<()>();
1709 let (port, input, complete_sink) = first_node
1710 .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1711 complete_sink.complete(input.map(q!(|data| {
1712 let mut resp: Vec<u8> = data.into();
1713 resp.push(42);
1714 resp.into() })));
1716
1717 let nodes = flow
1718 .with_process(&first_node, deployment.Localhost())
1719 .with_external(&external, deployment.Localhost())
1720 .deploy(&mut deployment);
1721
1722 deployment.deploy().await.unwrap();
1723 deployment.start().await.unwrap();
1724
1725 let (mut external_out, mut external_in) = nodes.connect(port).await;
1726
1727 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1728 assert_eq!(
1729 external_out.next().await.unwrap().unwrap(),
1730 vec![1, 2, 3, 42]
1731 );
1732 }
1733
1734 #[tokio::test]
1735 async fn echo_external_bytes() {
1736 let mut deployment = Deployment::new();
1737
1738 let mut flow = FlowBuilder::new();
1739 let first_node = flow.process::<()>();
1740 let external = flow.external::<()>();
1741
1742 let (port, input, _membership, complete_sink) = first_node
1743 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1744 complete_sink
1745 .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1746
1747 let nodes = flow
1748 .with_process(&first_node, deployment.Localhost())
1749 .with_external(&external, deployment.Localhost())
1750 .deploy(&mut deployment);
1751
1752 deployment.deploy().await.unwrap();
1753
1754 let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1755 let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1756
1757 deployment.start().await.unwrap();
1758
1759 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1760 external_in_2.send(vec![4, 5].into()).await.unwrap();
1761
1762 assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1763 assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1764 }
1765
1766 #[tokio::test]
1767 async fn echo_external_bincode() {
1768 let mut deployment = Deployment::new();
1769
1770 let mut flow = FlowBuilder::new();
1771 let first_node = flow.process::<()>();
1772 let external = flow.external::<()>();
1773
1774 let (port, input, _membership, complete_sink) =
1775 first_node.bidi_external_many_bincode(&external);
1776 complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1777
1778 let nodes = flow
1779 .with_process(&first_node, deployment.Localhost())
1780 .with_external(&external, deployment.Localhost())
1781 .deploy(&mut deployment);
1782
1783 deployment.deploy().await.unwrap();
1784
1785 let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1786 let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1787
1788 deployment.start().await.unwrap();
1789
1790 external_in_1.send("hi".to_owned()).await.unwrap();
1791 external_in_2.send("hello".to_owned()).await.unwrap();
1792
1793 assert_eq!(external_out_1.next().await.unwrap(), "HI");
1794 assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1795 }
1796
1797 #[tokio::test]
1798 async fn closure_location_name() {
1799 let mut deployment = Deployment::new();
1800 let mut flow = FlowBuilder::new();
1801
1802 enum ClosureProcess {}
1803
1804 let node = flow.process::<ClosureProcess>();
1805 let external = flow.external::<()>();
1806
1807 let (in_port, input) =
1808 node.source_external_bincode::<_, i32, TotalOrder, ExactlyOnce>(&external);
1809 let out = input.send_bincode_external(&external);
1810
1811 let nodes = flow
1812 .with_process(&node, deployment.Localhost())
1813 .with_external(&external, deployment.Localhost())
1814 .deploy(&mut deployment);
1815
1816 deployment.deploy().await.unwrap();
1817
1818 let mut external_in = nodes.connect(in_port).await;
1819 let mut external_out = nodes.connect(out).await;
1820
1821 deployment.start().await.unwrap();
1822
1823 external_in.send(42).await.unwrap();
1824 assert_eq!(external_out.next().await.unwrap(), 42);
1825 }
1826}