5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-22 20:00:27 +00:00

Add support for specifying TCP source interface, i.e. tcp://a.b.c.d:e/eth0, for multiple simultaneous peerings to the same node over different interfaces

This commit is contained in:
Neil Alexander 2018-09-25 15:32:45 +01:00
parent 4666b8f6cd
commit aecc151baf
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944
3 changed files with 49 additions and 14 deletions

View File

@ -395,7 +395,11 @@ func (a *admin) addPeer(addr string) error {
if err == nil { if err == nil {
switch strings.ToLower(u.Scheme) { switch strings.ToLower(u.Scheme) {
case "tcp": case "tcp":
a.core.tcp.connect(u.Host) if len(u.Path) > 1 {
a.core.tcp.connect(u.Host, u.Path[1:])
} else {
a.core.tcp.connect(u.Host, "")
}
case "socks": case "socks":
a.core.tcp.connectSOCKS(u.Host, u.Path[1:]) a.core.tcp.connectSOCKS(u.Host, u.Path[1:])
default: default:
@ -407,7 +411,7 @@ func (a *admin) addPeer(addr string) error {
if strings.HasPrefix(addr, "tcp:") { if strings.HasPrefix(addr, "tcp:") {
addr = addr[4:] addr = addr[4:]
} }
a.core.tcp.connect(addr) a.core.tcp.connect(addr, "")
return nil return nil
} }
return nil return nil

View File

@ -153,6 +153,6 @@ func (m *multicast) listen() {
} }
addr.Zone = from.Zone addr.Zone = from.Zone
saddr := addr.String() saddr := addr.String()
m.core.tcp.connect(saddr) m.core.tcp.connect(saddr, "")
} }
} }

View File

@ -64,13 +64,13 @@ func (iface *tcpInterface) getAddr() *net.TCPAddr {
} }
// Attempts to initiate a connection to the provided address. // Attempts to initiate a connection to the provided address.
func (iface *tcpInterface) connect(addr string) { func (iface *tcpInterface) connect(addr string, intf string) {
iface.call(addr, nil) iface.call(addr, nil, intf)
} }
// Attempst to initiate a connection to the provided address, viathe provided socks proxy address. // Attempst to initiate a connection to the provided address, viathe provided socks proxy address.
func (iface *tcpInterface) connectSOCKS(socksaddr, peeraddr string) { func (iface *tcpInterface) connectSOCKS(socksaddr, peeraddr string) {
iface.call(peeraddr, &socksaddr) iface.call(peeraddr, &socksaddr, "")
} }
// Initializes the struct. // Initializes the struct.
@ -110,20 +110,21 @@ func (iface *tcpInterface) listener() {
// If the dial is successful, it launches the handler. // If the dial is successful, it launches the handler.
// When finished, it removes the outgoing call, so reconnection attempts can be made later. // When finished, it removes the outgoing call, so reconnection attempts can be made later.
// This all happens in a separate goroutine that it spawns. // This all happens in a separate goroutine that it spawns.
func (iface *tcpInterface) call(saddr string, socksaddr *string) { func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) {
go func() { go func() {
callname := fmt.Sprintf("%s/%s", saddr, sintf)
quit := false quit := false
iface.mutex.Lock() iface.mutex.Lock()
if _, isIn := iface.calls[saddr]; isIn { if _, isIn := iface.calls[callname]; isIn {
quit = true quit = true
} else { } else {
iface.calls[saddr] = struct{}{} iface.calls[callname] = struct{}{}
defer func() { defer func() {
// Block new calls for a little while, to mitigate livelock scenarios // Block new calls for a little while, to mitigate livelock scenarios
time.Sleep(default_tcp_timeout) time.Sleep(default_tcp_timeout)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
iface.mutex.Lock() iface.mutex.Lock()
delete(iface.calls, saddr) delete(iface.calls, callname)
iface.mutex.Unlock() iface.mutex.Unlock()
}() }()
} }
@ -151,7 +152,36 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string) {
}, },
} }
} else { } else {
conn, err = net.Dial("tcp", saddr) dialer := net.Dialer{}
if sintf != "" {
ief, err := net.InterfaceByName(sintf)
if err == nil {
addrs, err := ief.Addrs()
if err == nil {
dst, err := net.ResolveTCPAddr("tcp", saddr)
if err != nil {
return
}
for _, addr := range addrs {
src, _, err := net.ParseCIDR(addr.String())
if err != nil {
continue
}
if (src.To4() != nil) == (dst.IP.To4() != nil) && src.IsGlobalUnicast() {
dialer.LocalAddr = &net.TCPAddr{
IP: src,
Port: 0,
}
}
}
if dialer.LocalAddr == nil {
iface.core.log.Println("No valid source address found for interface", sintf)
return
}
}
}
}
conn, err = dialer.Dial("tcp", saddr)
if err != nil { if err != nil {
return return
} }
@ -307,17 +337,18 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
// Put all of our cleanup here... // Put all of our cleanup here...
p.core.peers.removePeer(p.port) p.core.peers.removePeer(p.port)
}() }()
us, _, _ := net.SplitHostPort(sock.LocalAddr().String())
them, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) them, _, _ := net.SplitHostPort(sock.RemoteAddr().String())
themNodeID := getNodeID(&info.box) themNodeID := getNodeID(&info.box)
themAddr := address_addrForNodeID(themNodeID) themAddr := address_addrForNodeID(themNodeID)
themAddrString := net.IP(themAddr[:]).String() themAddrString := net.IP(themAddr[:]).String()
themString := fmt.Sprintf("%s@%s", themAddrString, them) themString := fmt.Sprintf("%s@%s", themAddrString, them)
iface.core.log.Println("Connected:", themString) iface.core.log.Println("Connected:", themString, "source", us)
err = iface.reader(sock, in) // In this goroutine, because of defers err = iface.reader(sock, in) // In this goroutine, because of defers
if err == nil { if err == nil {
iface.core.log.Println("Disconnected:", themString) iface.core.log.Println("Disconnected:", themString, "source", us)
} else { } else {
iface.core.log.Println("Disconnected:", themString, "with error:", err) iface.core.log.Println("Disconnected:", themString, "source", us, "with error:", err)
} }
return return
} }