11use std:: ops:: { Deref , DerefMut } ;
22use std:: sync:: Arc ;
3+ use anyhow:: anyhow;
34use boringtun:: noise:: rate_limiter:: RateLimiter ;
45use socket2:: Domain ;
56use tokio:: net:: UdpSocket ;
@@ -11,8 +12,8 @@ use crate::device::{HANDSHAKE_RATE_LIMIT, MAX_UDP_SIZE};
1112use crate :: device:: peer:: AllowedIP ;
1213use crate :: device:: tun:: { create_async_tun, ReadPart , WritePart } ;
1314use crate :: device:: script_run:: { run_opt_script, Scripts } ;
14- use crate :: device:: tunnel:: create_udp_socket;
15- use crate :: protobuf:: config:: Protocol ;
15+ use crate :: device:: tunnel:: { create_tcp_server , create_udp_socket} ;
16+ use crate :: protobuf:: config:: { NodeType , Protocol } ;
1617
1718
1819pub struct Device {
@@ -32,25 +33,60 @@ impl Device {
3233 mtu : u32 ,
3334 scripts : Scripts ,
3435 protocol : Protocol ,
36+ node_type : NodeType ,
3537 ) -> anyhow:: Result < Self > {
3638 run_opt_script ( & scripts. pre_up ) ?;
3739
38- let ( mut iface_reader, iface_writer, name) = create_async_tun ( name, mtu, address) ?;
39-
40-
41- let udp4 = Arc :: new ( create_udp_socket ( port, Domain :: IPV4 , None ) ?) ;
42-
43- let port = udp4. local_addr ( ) ?. port ( ) ;
44- let udp6 = Arc :: new ( create_udp_socket ( Some ( port) , Domain :: IPV6 , None ) ?) ;
45-
40+ let ( iface_reader, iface_writer, name) = create_async_tun ( name, mtu, address) ?;
41+ let iface_writer = Arc :: new ( Mutex :: new ( iface_writer) ) ;
4642
4743 let rate_limiter = Arc :: new ( RateLimiter :: new ( & key_pair. 1 , HANDSHAKE_RATE_LIMIT ) ) ;
4844 let peers: Arc < RwLock < Peers > > = Arc :: new ( RwLock :: new ( Peers :: default ( ) ) ) ;
45+ let peers1 = peers. clone ( ) ;
46+ let key_pair1 = key_pair. clone ( ) ;
47+ let ( read_task, write_task, port) = match protocol {
48+ Protocol :: Udp => {
49+ let udp4 = Arc :: new ( create_udp_socket ( port, Domain :: IPV4 , None ) ?) ;
50+ let port = udp4. local_addr ( ) ?. port ( ) ;
51+ let udp6 = Arc :: new ( create_udp_socket ( Some ( port) , Domain :: IPV6 , None ) ?) ;
52+
53+ let read_task = udp_read_tun ( iface_reader, peers. clone ( ) , udp4. clone ( ) , udp6. clone ( ) ) ;
54+ let write_task = udp_and_timer ( key_pair. clone ( ) , peers. clone ( ) , rate_limiter. clone ( ) , udp4. clone ( ) , udp6. clone ( ) , iface_writer) ;
55+ ( read_task, write_task, port)
56+ } ,
57+ Protocol :: Tcp => {
58+ let ip = address[ 0 ] . addr . clone ( ) ;
59+ let tcp6 = create_tcp_server ( port, Domain :: IPV6 , None ) ?;
60+ let port = tcp6. local_addr ( ) ?. port ( ) ;
61+ let key_pair = Arc :: new ( key_pair) ;
62+ let read_task = tcp_read_tun ( iface_reader, peers. clone ( ) ) ;
63+ let write_task: JoinHandle < ( ) > = tokio:: spawn ( async move {
64+ loop {
65+ tokio:: select! {
66+ _ = device:: rate_limiter_timer( & rate_limiter) => { }
67+ _ = device:: tcp_peers_timer(
68+ & ip,
69+ & peers,
70+ key_pair. clone( ) ,
71+ rate_limiter. clone( ) ,
72+ iface_writer. clone( ) ,
73+ false ,
74+ node_type,
75+ ) => { }
76+ _ = device:: tcp_listener_handler( & tcp6, key_pair. clone( ) , rate_limiter. clone( ) , Arc :: clone( & peers) , iface_writer. clone( ) , false ) => {
77+ break ;
78+ }
79+ }
80+ }
81+ } ) ;
82+ ( read_task, write_task, port)
83+ }
84+ } ;
4985
50- let read_task = read_tun ( iface_reader, peers. clone ( ) , udp4. clone ( ) , udp6. clone ( ) ) ;
51- let write_task = udp_and_timer ( key_pair. clone ( ) , peers. clone ( ) , rate_limiter. clone ( ) , udp4. clone ( ) , udp6. clone ( ) , Arc :: new ( Mutex :: new ( iface_writer) ) ) ;
86+ // let read_task = read_tun(iface_reader, peers.clone(), udp4.clone(), udp6.clone());
87+ // let write_task = udp_and_timer(key_pair.clone(), peers.clone(), rate_limiter.clone(), udp4.clone(), udp6.clone(), Arc::new(Mutex::new(iface_writer)));
5288 let device = Self {
53- device_data : DeviceData :: new ( name, peers , key_pair , port, scripts) ,
89+ device_data : DeviceData :: new ( name, peers1 , key_pair1 , port, scripts, node_type ) ,
5490 read_task,
5591 write_task,
5692 protocol,
@@ -66,7 +102,7 @@ impl Device {
66102 }
67103}
68104
69- fn read_tun ( mut read_tun : ReadPart , peers : Arc < RwLock < Peers > > , udp4 : Arc < UdpSocket > , udp6 : Arc < UdpSocket > ) -> JoinHandle < ( ) > {
105+ fn udp_read_tun ( mut read_tun : ReadPart , peers : Arc < RwLock < Peers > > , udp4 : Arc < UdpSocket > , udp6 : Arc < UdpSocket > ) -> JoinHandle < ( ) > {
70106 return tokio:: spawn ( async move {
71107 let mut dst_buf: Vec < u8 > = vec ! [ 0 ; MAX_UDP_SIZE ] ;
72108 let mut src_buf: Vec < u8 > = vec ! [ 0 ; MAX_UDP_SIZE ] ;
@@ -77,6 +113,16 @@ fn read_tun(mut read_tun: ReadPart, peers: Arc<RwLock<Peers>>, udp4: Arc<UdpSock
77113 } ) ;
78114}
79115
116+ fn tcp_read_tun ( mut read_tun : ReadPart , peers : Arc < RwLock < Peers > > ) ->JoinHandle < ( ) > {
117+ return tokio:: spawn ( async move {
118+ let mut dst_buf: Vec < u8 > = vec ! [ 0 ; MAX_UDP_SIZE ] ;
119+ let mut src_buf: Vec < u8 > = vec ! [ 0 ; MAX_UDP_SIZE ] ;
120+ while let Ok ( size) = read_tun. read ( & mut src_buf) {
121+ device:: tun_read_tcp_handle ( & peers, & src_buf[ ..size] , & mut dst_buf) . await ;
122+ }
123+ } )
124+ }
125+
80126fn udp_and_timer (
81127 key_pair : ( x25519_dalek:: StaticSecret , x25519_dalek:: PublicKey ) ,
82128 peers : Arc < RwLock < Peers > > ,
0 commit comments