2017-12-29 04:16:20 +00:00
package yggdrasil
// This sends packets to peers using TCP as a transport
// It's generally better tested than the UDP implementation
// Using it regularly is insane, but I find TCP easier to test/debug with it
// Updating and optimizing the UDP version is a higher priority
// TODO:
// Something needs to make sure we're getting *valid* packets
// Could be used to DoS (connect, give someone else's keys, spew garbage)
// I guess the "peer" part should watch for link packets, disconnect?
2018-06-09 22:46:19 +00:00
// TCP connections start with a metadata exchange.
// It involves exchanging version numbers and crypto keys
// See version.go for version metadata format
2018-06-12 22:50:08 +00:00
import (
2019-01-13 18:08:41 +00:00
"context"
2018-06-12 22:50:08 +00:00
"fmt"
2018-06-21 15:39:43 +00:00
"math/rand"
2018-06-12 22:50:08 +00:00
"net"
"sync"
"sync/atomic"
"time"
"golang.org/x/net/proxy"
2018-12-15 02:49:18 +00:00
"github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
2018-06-12 22:50:08 +00:00
)
2017-12-29 04:16:20 +00:00
2019-01-19 00:14:10 +00:00
const default_timeout = 6 * time . Second
const tcp_ping_interval = ( default_timeout * 2 / 3 )
2017-12-29 04:16:20 +00:00
2018-06-10 23:03:28 +00:00
// The TCP listener and information about active TCP connections, to avoid duplication.
2017-12-29 04:16:20 +00:00
type tcpInterface struct {
2018-07-29 14:30:13 +00:00
core * Core
2018-12-30 15:21:09 +00:00
reconfigure chan chan error
2018-07-29 14:30:13 +00:00
serv net . Listener
2019-01-19 00:14:10 +00:00
stop chan bool
timeout time . Duration
addr string
2018-07-29 14:30:13 +00:00
mutex sync . Mutex // Protecting the below
calls map [ string ] struct { }
conns map [ tcpInfo ] ( chan struct { } )
2019-01-19 00:14:10 +00:00
stream stream
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// This is used as the key to a map that tracks existing connections, to prevent multiple connections to the same keys and local/remote address pair from occuring.
// Different address combinations are allowed, so multi-homing is still technically possible (but not necessarily advisable).
2018-02-18 02:44:23 +00:00
type tcpInfo struct {
2018-12-15 02:49:18 +00:00
box crypto . BoxPubKey
sig crypto . SigPubKey
2018-12-30 21:11:16 +00:00
localAddr string
2018-04-20 12:41:09 +00:00
remoteAddr string
2017-12-29 04:16:20 +00:00
}
2018-12-15 00:15:35 +00:00
// Wrapper function to set additional options for specific connection types.
func ( iface * tcpInterface ) setExtraOptions ( c net . Conn ) {
switch sock := c . ( type ) {
case * net . TCPConn :
sock . SetNoDelay ( true )
// TODO something for socks5
default :
}
}
2018-06-10 23:03:28 +00:00
// Returns the address of the listener.
2018-05-27 21:13:37 +00:00
func ( iface * tcpInterface ) getAddr ( ) * net . TCPAddr {
return iface . serv . Addr ( ) . ( * net . TCPAddr )
}
2018-06-10 23:03:28 +00:00
// Attempts to initiate a connection to the provided address.
2018-09-25 14:32:45 +00:00
func ( iface * tcpInterface ) connect ( addr string , intf string ) {
iface . call ( addr , nil , intf )
2018-05-27 21:13:37 +00:00
}
2018-06-10 23:03:28 +00:00
// Attempst to initiate a connection to the provided address, viathe provided socks proxy address.
2018-05-27 21:13:37 +00:00
func ( iface * tcpInterface ) connectSOCKS ( socksaddr , peeraddr string ) {
2018-09-25 14:32:45 +00:00
iface . call ( peeraddr , & socksaddr , "" )
2018-05-27 21:13:37 +00:00
}
2018-06-10 23:03:28 +00:00
// Initializes the struct.
2018-12-29 19:14:26 +00:00
func ( iface * tcpInterface ) init ( core * Core ) ( err error ) {
2018-01-04 22:37:51 +00:00
iface . core = core
2019-01-19 00:14:10 +00:00
iface . stop = make ( chan bool , 1 )
2018-12-30 15:21:09 +00:00
iface . reconfigure = make ( chan chan error , 1 )
go func ( ) {
for {
2019-01-15 08:51:19 +00:00
e := <- iface . reconfigure
iface . core . configMutex . RLock ( )
updated := iface . core . config . Listen != iface . core . configOld . Listen
iface . core . configMutex . RUnlock ( )
if updated {
2019-01-19 00:14:10 +00:00
iface . stop <- true
2019-01-15 08:51:19 +00:00
iface . serv . Close ( )
e <- iface . listen ( )
} else {
e <- nil
2018-12-30 15:21:09 +00:00
}
}
} ( )
return iface . listen ( )
}
func ( iface * tcpInterface ) listen ( ) error {
var err error
2018-12-29 19:53:31 +00:00
iface . core . configMutex . RLock ( )
2019-01-19 00:14:10 +00:00
iface . addr = iface . core . config . Listen
iface . timeout = time . Duration ( iface . core . config . ReadTimeout ) * time . Millisecond
2018-12-29 19:53:31 +00:00
iface . core . configMutex . RUnlock ( )
2018-12-30 15:21:09 +00:00
2019-01-19 00:14:10 +00:00
if iface . timeout >= 0 && iface . timeout < default_timeout {
iface . timeout = default_timeout
2018-07-29 14:30:13 +00:00
}
2019-01-13 18:08:41 +00:00
ctx := context . Background ( )
lc := net . ListenConfig {
Control : iface . tcpContext ,
}
2019-01-19 00:14:10 +00:00
iface . serv , err = lc . Listen ( ctx , "tcp" , iface . addr )
2018-04-19 14:30:40 +00:00
if err == nil {
2019-01-16 13:20:12 +00:00
iface . mutex . Lock ( )
2018-04-19 14:30:40 +00:00
iface . calls = make ( map [ string ] struct { } )
iface . conns = make ( map [ tcpInfo ] ( chan struct { } ) )
2019-01-16 13:20:12 +00:00
iface . mutex . Unlock ( )
2018-04-19 14:30:40 +00:00
go iface . listener ( )
2018-12-30 15:21:09 +00:00
return nil
2018-01-04 22:37:51 +00:00
}
2018-05-27 21:13:37 +00:00
return err
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// Runs the listener, which spawns off goroutines for incoming connections.
2017-12-29 04:16:20 +00:00
func ( iface * tcpInterface ) listener ( ) {
2018-01-04 22:37:51 +00:00
defer iface . serv . Close ( )
2018-03-07 09:41:04 +00:00
iface . core . log . Println ( "Listening for TCP on:" , iface . serv . Addr ( ) . String ( ) )
2018-01-04 22:37:51 +00:00
for {
2018-04-19 14:30:40 +00:00
sock , err := iface . serv . Accept ( )
2019-01-15 08:53:57 +00:00
if err != nil {
iface . core . log . Println ( "Failed to accept connection:" , err )
return
}
2018-12-30 15:21:09 +00:00
select {
2019-01-19 00:14:10 +00:00
case <- iface . stop :
2018-12-30 15:21:09 +00:00
iface . core . log . Println ( "Stopping listener" )
return
default :
if err != nil {
panic ( err )
}
go iface . handler ( sock , true )
2018-01-04 22:37:51 +00:00
}
}
2017-12-29 04:16:20 +00:00
}
2018-12-30 21:11:16 +00:00
// Checks if we already have a connection to this node
func ( iface * tcpInterface ) isAlreadyConnected ( info tcpInfo ) bool {
iface . mutex . Lock ( )
defer iface . mutex . Unlock ( )
_ , isIn := iface . conns [ info ]
return isIn
}
// Checks if we already are calling this address
func ( iface * tcpInterface ) isAlreadyCalling ( saddr string ) bool {
iface . mutex . Lock ( )
defer iface . mutex . Unlock ( )
_ , isIn := iface . calls [ saddr ]
return isIn
}
2018-06-10 23:03:28 +00:00
// Checks if a connection already exists.
// If not, it adds it to the list of active outgoing calls (to block future attempts) and dials the address.
// If the dial is successful, it launches the handler.
// When finished, it removes the outgoing call, so reconnection attempts can be made later.
// This all happens in a separate goroutine that it spawns.
2018-09-25 14:32:45 +00:00
func ( iface * tcpInterface ) call ( saddr string , socksaddr * string , sintf string ) {
2018-01-04 22:37:51 +00:00
go func ( ) {
2018-09-25 17:05:57 +00:00
callname := saddr
if sintf != "" {
callname = fmt . Sprintf ( "%s/%s" , saddr , sintf )
}
2018-12-30 21:11:16 +00:00
if iface . isAlreadyCalling ( saddr ) {
2018-06-14 14:11:34 +00:00
return
}
2019-01-16 13:20:12 +00:00
iface . mutex . Lock ( )
2018-12-30 21:11:16 +00:00
iface . calls [ callname ] = struct { } { }
2019-01-16 13:20:12 +00:00
iface . mutex . Unlock ( )
2018-12-30 21:11:16 +00:00
defer func ( ) {
// Block new calls for a little while, to mitigate livelock scenarios
2019-01-19 00:14:10 +00:00
time . Sleep ( default_timeout )
2018-12-30 21:11:16 +00:00
time . Sleep ( time . Duration ( rand . Intn ( 1000 ) ) * time . Millisecond )
iface . mutex . Lock ( )
delete ( iface . calls , callname )
iface . mutex . Unlock ( )
} ( )
2018-06-14 14:11:34 +00:00
var conn net . Conn
var err error
2018-06-14 14:21:35 +00:00
if socksaddr != nil {
2018-09-25 18:46:06 +00:00
if sintf != "" {
return
}
2018-06-14 14:21:35 +00:00
var dialer proxy . Dialer
dialer , err = proxy . SOCKS5 ( "tcp" , * socksaddr , nil , proxy . Direct )
if err != nil {
return
}
2018-06-14 14:11:34 +00:00
conn , err = dialer . Dial ( "tcp" , saddr )
if err != nil {
return
}
conn = & wrappedConn {
c : conn ,
raddr : & wrappedAddr {
network : "tcp" ,
addr : saddr ,
} ,
}
} else {
2019-01-13 18:08:41 +00:00
dialer := net . Dialer {
Control : iface . tcpContext ,
}
2018-09-25 14:32:45 +00:00
if sintf != "" {
ief , err := net . InterfaceByName ( sintf )
2018-10-04 11:26:08 +00:00
if err != nil {
return
2019-01-16 14:52:27 +00:00
}
if ief . Flags & net . FlagUp == 0 {
return
}
addrs , err := ief . Addrs ( )
if err == nil {
dst , err := net . ResolveTCPAddr ( "tcp" , saddr )
if err != nil {
2018-09-27 11:14:55 +00:00
return
2018-09-25 18:46:06 +00:00
}
2019-01-16 14:52:27 +00:00
for addrindex , addr := range addrs {
src , _ , err := net . ParseCIDR ( addr . String ( ) )
2018-09-25 14:32:45 +00:00
if err != nil {
2019-01-16 14:52:27 +00:00
continue
2018-09-25 14:32:45 +00:00
}
2019-01-17 23:06:59 +00:00
if src . Equal ( dst . IP ) {
continue
}
if ! src . IsGlobalUnicast ( ) && ! src . IsLinkLocalUnicast ( ) {
continue
}
bothglobal := src . IsGlobalUnicast ( ) == dst . IP . IsGlobalUnicast ( )
bothlinklocal := src . IsLinkLocalUnicast ( ) == dst . IP . IsLinkLocalUnicast ( )
if ! bothglobal && ! bothlinklocal {
continue
}
if ( src . To4 ( ) != nil ) != ( dst . IP . To4 ( ) != nil ) {
continue
}
if bothglobal || bothlinklocal || addrindex == len ( addrs ) - 1 {
dialer . LocalAddr = & net . TCPAddr {
IP : src ,
Port : 0 ,
Zone : sintf ,
2018-09-25 14:32:45 +00:00
}
2019-01-17 23:06:59 +00:00
break
2018-09-25 14:32:45 +00:00
}
2019-01-16 14:52:27 +00:00
}
if dialer . LocalAddr == nil {
return
2018-09-25 14:32:45 +00:00
}
}
}
2019-01-16 14:52:27 +00:00
2018-09-25 14:32:45 +00:00
conn , err = dialer . Dial ( "tcp" , saddr )
2018-01-04 22:37:51 +00:00
if err != nil {
return
}
}
2018-06-14 14:11:34 +00:00
iface . handler ( conn , false )
2018-01-04 22:37:51 +00:00
} ( )
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// This exchanges/checks connection metadata, sets up the peer struct, sets up the writer goroutine, and then runs the reader within the current goroutine.
// It defers a bunch of cleanup stuff to tear down all of these things when the reader exists (e.g. due to a closed connection or a timeout).
2018-05-06 21:32:34 +00:00
func ( iface * tcpInterface ) handler ( sock net . Conn , incoming bool ) {
2018-01-04 22:37:51 +00:00
defer sock . Close ( )
2018-12-15 00:15:35 +00:00
iface . setExtraOptions ( sock )
2018-01-04 22:37:51 +00:00
// Get our keys
2018-12-15 02:49:18 +00:00
myLinkPub , myLinkPriv := crypto . NewBoxKeys ( ) // ephemeral link keys
2018-06-09 22:46:19 +00:00
meta := version_getBaseMetadata ( )
meta . box = iface . core . boxPub
meta . sig = iface . core . sigPub
meta . link = * myLinkPub
metaBytes := meta . encode ( )
_ , err := sock . Write ( metaBytes )
2018-01-04 22:37:51 +00:00
if err != nil {
return
}
2019-01-19 00:14:10 +00:00
if iface . timeout > 0 {
sock . SetReadDeadline ( time . Now ( ) . Add ( iface . timeout ) )
2018-07-29 14:30:13 +00:00
}
2018-06-09 23:38:30 +00:00
_ , err = sock . Read ( metaBytes )
2018-01-04 22:37:51 +00:00
if err != nil {
return
}
2018-06-09 22:46:19 +00:00
meta = version_metadata { } // Reset to zero value
2018-06-09 23:38:30 +00:00
if ! meta . decode ( metaBytes ) || ! meta . check ( ) {
// Failed to decode and check the metadata
// If it's a version mismatch issue, then print an error message
2018-06-09 22:46:19 +00:00
base := version_getBaseMetadata ( )
if meta . meta == base . meta {
if meta . ver > base . ver {
iface . core . log . Println ( "Failed to connect to node:" , sock . RemoteAddr ( ) . String ( ) , "version:" , meta . ver )
} else if meta . ver == base . ver && meta . minorVer > base . minorVer {
iface . core . log . Println ( "Failed to connect to node:" , sock . RemoteAddr ( ) . String ( ) , "version:" , fmt . Sprintf ( "%d.%d" , meta . ver , meta . minorVer ) )
}
}
2018-06-09 23:38:30 +00:00
// TODO? Block forever to prevent future connection attempts? suppress future messages about the same node?
2018-01-04 22:37:51 +00:00
return
}
2018-12-30 21:11:16 +00:00
remoteAddr , _ , e1 := net . SplitHostPort ( sock . RemoteAddr ( ) . String ( ) )
localAddr , _ , e2 := net . SplitHostPort ( sock . LocalAddr ( ) . String ( ) )
if e1 != nil || e2 != nil {
return
}
2018-06-09 22:46:19 +00:00
info := tcpInfo { // used as a map key, so don't include ephemeral link key
2018-12-30 21:11:16 +00:00
box : meta . box ,
sig : meta . sig ,
localAddr : localAddr ,
remoteAddr : remoteAddr ,
}
if iface . isAlreadyConnected ( info ) {
return
2018-06-09 22:46:19 +00:00
}
2018-01-04 22:37:51 +00:00
// Quit the parent call if this is a connection to ourself
equiv := func ( k1 , k2 [ ] byte ) bool {
for idx := range k1 {
if k1 [ idx ] != k2 [ idx ] {
return false
}
}
return true
}
2018-12-30 21:11:16 +00:00
if equiv ( meta . box [ : ] , iface . core . boxPub [ : ] ) {
2018-01-04 22:37:51 +00:00
return
2018-06-12 22:50:08 +00:00
}
2018-12-30 21:11:16 +00:00
if equiv ( meta . sig [ : ] , iface . core . sigPub [ : ] ) {
2018-01-04 22:37:51 +00:00
return
}
2018-05-06 21:32:34 +00:00
// Check if we're authorized to connect to this key / IP
2018-12-30 21:11:16 +00:00
if incoming && ! iface . core . peers . isAllowedEncryptionPublicKey ( & meta . box ) {
2018-05-06 21:32:34 +00:00
// Allow unauthorized peers if they're link-local
raddrStr , _ , _ := net . SplitHostPort ( sock . RemoteAddr ( ) . String ( ) )
raddr := net . ParseIP ( raddrStr )
if ! raddr . IsLinkLocalUnicast ( ) {
return
}
}
2018-02-18 02:44:23 +00:00
// Check if we already have a connection to this node, close and block if yes
iface . mutex . Lock ( )
2018-12-30 21:11:16 +00:00
/ * if blockChan , isIn := iface . conns [ info ] ; isIn {
2018-02-18 02:44:23 +00:00
iface . mutex . Unlock ( )
sock . Close ( )
<- blockChan
return
2018-12-30 21:11:16 +00:00
} * /
2018-02-18 02:44:23 +00:00
blockChan := make ( chan struct { } )
iface . conns [ info ] = blockChan
iface . mutex . Unlock ( )
2018-02-20 05:22:36 +00:00
defer func ( ) {
iface . mutex . Lock ( )
delete ( iface . conns , info )
iface . mutex . Unlock ( )
close ( blockChan )
} ( )
2018-01-04 22:37:51 +00:00
// Note that multiple connections to the same node are allowed
// E.g. over different interfaces
2018-12-30 21:11:16 +00:00
p := iface . core . peers . newPeer ( & meta . box , & meta . sig , crypto . GetSharedKey ( myLinkPriv , & meta . link ) , sock . RemoteAddr ( ) . String ( ) )
2018-06-06 22:44:10 +00:00
p . linkOut = make ( chan [ ] byte , 1 )
2018-01-04 22:37:51 +00:00
in := func ( bs [ ] byte ) {
2018-06-06 22:44:10 +00:00
p . handlePacket ( bs )
2018-01-04 22:37:51 +00:00
}
2018-06-24 02:51:32 +00:00
out := make ( chan [ ] byte , 1 )
2018-01-04 22:37:51 +00:00
defer close ( out )
go func ( ) {
2018-06-24 02:51:32 +00:00
// This goroutine waits for outgoing packets, link protocol traffic, or sends idle keep-alive traffic
2018-06-25 01:20:07 +00:00
send := func ( msg [ ] byte ) {
msgLen := wire_encode_uint64 ( uint64 ( len ( msg ) ) )
2019-01-19 00:14:10 +00:00
buf := net . Buffers { streamMsg [ : ] , msgLen , msg }
2018-06-25 01:20:07 +00:00
buf . WriteTo ( sock )
2019-01-19 00:14:10 +00:00
atomic . AddUint64 ( & p . bytesSent , uint64 ( len ( streamMsg ) + len ( msgLen ) + len ( msg ) ) )
2018-12-15 02:49:18 +00:00
util . PutBytes ( msg )
2018-06-25 01:20:07 +00:00
}
2018-07-29 14:30:13 +00:00
timerInterval := tcp_ping_interval
2018-06-07 21:49:51 +00:00
timer := time . NewTimer ( timerInterval )
defer timer . Stop ( )
2018-06-06 22:44:10 +00:00
for {
2018-06-24 02:51:32 +00:00
select {
case msg := <- p . linkOut :
// Always send outgoing link traffic first, if needed
2018-06-25 01:20:07 +00:00
send ( msg )
2018-06-24 02:51:32 +00:00
continue
default :
2018-06-07 20:04:17 +00:00
}
2018-06-24 02:51:32 +00:00
// Otherwise wait reset the timer and wait for something to do
2018-06-07 21:49:51 +00:00
timer . Stop ( )
select {
case <- timer . C :
default :
}
timer . Reset ( timerInterval )
2018-06-06 22:44:10 +00:00
select {
2018-06-07 21:49:51 +00:00
case _ = <- timer . C :
2018-06-25 01:20:07 +00:00
send ( nil ) // TCP keep-alive traffic
2018-06-06 22:44:10 +00:00
case msg := <- p . linkOut :
2018-06-25 01:20:07 +00:00
send ( msg )
2018-06-06 22:44:10 +00:00
case msg , ok := <- out :
if ! ok {
return
}
2018-06-25 01:20:07 +00:00
send ( msg ) // Block until the socket write has finished
2018-06-24 02:51:32 +00:00
// Now inform the switch that we're ready for more traffic
p . core . switchTable . idleIn <- p . port
2018-01-04 22:37:51 +00:00
}
}
} ( )
2018-06-24 02:51:32 +00:00
p . core . switchTable . idleIn <- p . port // Start in the idle state
2018-01-04 22:37:51 +00:00
p . out = func ( msg [ ] byte ) {
defer func ( ) { recover ( ) } ( )
2018-06-23 01:39:57 +00:00
out <- msg
2018-01-04 22:37:51 +00:00
}
2018-05-05 22:14:03 +00:00
p . close = func ( ) { sock . Close ( ) }
2018-06-06 22:44:10 +00:00
go p . linkLoop ( )
2018-01-04 22:37:51 +00:00
defer func ( ) {
// Put all of our cleanup here...
2018-05-05 22:14:03 +00:00
p . core . peers . removePeer ( p . port )
2018-01-04 22:37:51 +00:00
} ( )
2018-09-25 14:32:45 +00:00
us , _ , _ := net . SplitHostPort ( sock . LocalAddr ( ) . String ( ) )
2018-04-20 12:41:09 +00:00
them , _ , _ := net . SplitHostPort ( sock . RemoteAddr ( ) . String ( ) )
2018-12-30 21:11:16 +00:00
themNodeID := crypto . GetNodeID ( & meta . box )
2018-12-15 02:49:18 +00:00
themAddr := address . AddrForNodeID ( themNodeID )
2018-01-04 22:37:51 +00:00
themAddrString := net . IP ( themAddr [ : ] ) . String ( )
themString := fmt . Sprintf ( "%s@%s" , themAddrString , them )
2018-12-30 15:21:09 +00:00
iface . core . log . Printf ( "Connected: %s, source: %s" , themString , us )
2019-01-19 00:14:10 +00:00
iface . stream . init ( )
bs := make ( [ ] byte , 2 * streamMsgSize )
var n int
2018-01-04 22:37:51 +00:00
for {
2019-01-19 00:14:10 +00:00
if iface . timeout > 0 {
sock . SetReadDeadline ( time . Now ( ) . Add ( iface . timeout ) )
2018-07-29 14:30:13 +00:00
}
2019-01-19 00:14:10 +00:00
n , err = sock . Read ( bs )
if err != nil {
break
2018-01-04 22:37:51 +00:00
}
2019-01-19 00:14:10 +00:00
if n > 0 {
iface . stream . write ( bs [ : n ] , in )
2018-01-04 22:37:51 +00:00
}
}
2019-01-19 00:14:10 +00:00
if err == nil {
iface . core . log . Printf ( "Disconnected: %s, source: %s" , themString , us )
} else {
iface . core . log . Printf ( "Disconnected: %s, source: %s, error: %s" , themString , us , err )
2018-01-04 22:37:51 +00:00
}
2019-01-19 00:14:10 +00:00
return
2017-12-29 04:16:20 +00:00
}