5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2025-01-11 17:45:41 +00:00
yggdrasil-go/src/core/link.go

433 lines
10 KiB
Go
Raw Normal View History

2021-05-23 19:42:26 +00:00
package core
import (
2022-07-24 09:23:25 +00:00
"bytes"
2019-01-31 23:29:18 +00:00
"encoding/hex"
2019-01-05 12:06:45 +00:00
"errors"
"fmt"
"io"
"net"
"net/url"
"strconv"
"strings"
"sync/atomic"
2022-10-02 12:20:39 +00:00
"time"
2022-08-06 14:05:12 +00:00
"github.com/Arceliar/phony"
"github.com/yggdrasil-network/yggdrasil-go/src/address"
)
2020-05-23 15:23:55 +00:00
type links struct {
phony.Inbox
core *Core
tcp *linkTCP // TCP interface support
tls *linkTLS // TLS interface support
unix *linkUNIX // UNIX interface support
socks *linkSOCKS // SOCKS interface support
_links map[linkInfo]*link // *link is nil if connection in progress
}
// linkInfo is used as a map key
type linkInfo struct {
linkType string // Type of link, e.g. TCP, AWDL
local string // Local name or address
remote string // Remote name or address
}
type linkDial struct {
url *url.URL
sintf string
}
2020-05-23 15:28:57 +00:00
type link struct {
lname string
links *links
conn *linkConn
options linkOptions
info linkInfo
incoming bool
force bool
}
type linkOptions struct {
2021-05-23 19:33:28 +00:00
pinnedEd25519Keys map[keyArray]struct{}
priority uint8
}
type Listener struct {
net.Listener
closed chan struct{}
}
func (l *Listener) Close() error {
err := l.Listener.Close()
<-l.closed
return err
}
2020-05-23 15:23:55 +00:00
func (l *links) init(c *Core) error {
2019-01-04 17:23:37 +00:00
l.core = c
l.tcp = l.newLinkTCP()
l.tls = l.newLinkTLS(l.tcp)
l.unix = l.newLinkUNIX()
l.socks = l.newLinkSOCKS()
l._links = make(map[linkInfo]*link)
2022-08-06 14:05:12 +00:00
var listeners []ListenAddress
phony.Block(c, func() {
listeners = make([]ListenAddress, 0, len(c.config._listeners))
for listener := range c.config._listeners {
listeners = append(listeners, listener)
}
})
2019-03-04 17:09:48 +00:00
2019-01-04 17:23:37 +00:00
return nil
}
2022-09-24 16:05:44 +00:00
func (l *links) shutdown() {
phony.Block(l.tcp, func() {
for l := range l.tcp._listeners {
2022-09-24 16:05:44 +00:00
_ = l.Close()
}
})
phony.Block(l.tls, func() {
for l := range l.tls._listeners {
2022-09-24 16:05:44 +00:00
_ = l.Close()
}
})
phony.Block(l.unix, func() {
for l := range l.unix._listeners {
2022-09-24 16:05:44 +00:00
_ = l.Close()
}
})
}
func (l *links) isConnectedTo(info linkInfo) bool {
var isConnected bool
phony.Block(l, func() {
_, isConnected = l._links[info]
})
return isConnected
}
func (l *links) call(u *url.URL, sintf string, errch chan<- error) (info linkInfo, err error) {
info = linkInfoFor(u.Scheme, sintf, u.Host)
if l.isConnectedTo(info) {
2022-11-12 11:56:50 +00:00
if errch != nil {
close(errch) // already connected, no error
}
return info, nil
}
options := linkOptions{
pinnedEd25519Keys: map[keyArray]struct{}{},
}
for _, pubkey := range u.Query()["key"] {
sigPub, err := hex.DecodeString(pubkey)
if err != nil {
2022-11-12 11:56:50 +00:00
if errch != nil {
close(errch)
}
return info, fmt.Errorf("pinned key contains invalid hex characters")
}
var sigPubKey keyArray
copy(sigPubKey[:], sigPub)
options.pinnedEd25519Keys[sigPubKey] = struct{}{}
}
if p := u.Query().Get("priority"); p != "" {
pi, err := strconv.ParseUint(p, 10, 8)
if err != nil {
2022-11-12 11:56:50 +00:00
if errch != nil {
close(errch)
}
return info, fmt.Errorf("priority invalid: %w", err)
}
options.priority = uint8(pi)
}
switch info.linkType {
case "tcp":
go func() {
if errch != nil {
defer close(errch)
}
if err := l.tcp.dial(u, options, sintf); err != nil && err != io.EOF {
l.core.log.Warnf("Failed to dial TCP %s: %s\n", u.Host, err)
if errch != nil {
errch <- err
}
}
}()
case "socks":
go func() {
if errch != nil {
defer close(errch)
}
if err := l.socks.dial(u, options); err != nil && err != io.EOF {
l.core.log.Warnf("Failed to dial SOCKS %s: %s\n", u.Host, err)
if errch != nil {
errch <- err
}
}
}()
case "tls":
2021-08-01 20:36:51 +00:00
// SNI headers must contain hostnames and not IP addresses, so we must make sure
// that we do not populate the SNI with an IP literal. We do this by splitting
// the host-port combo from the query option and then seeing if it parses to an
// IP address successfully or not.
var tlsSNI string
2021-08-01 20:36:51 +00:00
if sni := u.Query().Get("sni"); sni != "" {
2021-08-01 20:39:49 +00:00
if net.ParseIP(sni) == nil {
tlsSNI = sni
2021-08-01 20:36:51 +00:00
}
}
// If the SNI is not configured still because the above failed then we'll try
// again but this time we'll use the host part of the peering URI instead.
if tlsSNI == "" {
if host, _, err := net.SplitHostPort(u.Host); err == nil && net.ParseIP(host) == nil {
tlsSNI = host
}
}
go func() {
if errch != nil {
defer close(errch)
}
if err := l.tls.dial(u, options, sintf, tlsSNI); err != nil && err != io.EOF {
l.core.log.Warnf("Failed to dial TLS %s: %s\n", u.Host, err)
if errch != nil {
errch <- err
}
}
}()
case "unix":
go func() {
if errch != nil {
defer close(errch)
}
if err := l.unix.dial(u, options, sintf); err != nil && err != io.EOF {
l.core.log.Warnf("Failed to dial UNIX %s: %s\n", u.Host, err)
if errch != nil {
errch <- err
}
}
}()
default:
2022-11-12 11:56:50 +00:00
if errch != nil {
close(errch)
}
return info, errors.New("unknown call scheme: " + u.Scheme)
}
return info, nil
}
func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
var listener *Listener
var err error
switch u.Scheme {
case "tcp":
listener, err = l.tcp.listen(u, sintf)
case "tls":
listener, err = l.tls.listen(u, sintf)
case "unix":
listener, err = l.unix.listen(u, sintf)
default:
return nil, fmt.Errorf("unrecognised scheme %q", u.Scheme)
}
return listener, err
}
func (l *links) create(conn net.Conn, dial *linkDial, name string, info linkInfo, incoming, force bool, options linkOptions) error {
2020-05-23 15:28:57 +00:00
intf := link{
conn: &linkConn{
Conn: conn,
up: time.Now(),
},
lname: name,
links: l,
options: options,
info: info,
incoming: incoming,
force: force,
2019-01-04 17:23:37 +00:00
}
go func() {
if err := intf.handler(dial); err != nil {
l.core.log.Errorf("Link handler %s error (%s): %s", name, conn.RemoteAddr(), err)
}
}()
return nil
}
func (intf *link) handler(dial *linkDial) error {
2022-09-24 16:05:44 +00:00
defer intf.conn.Close() // nolint:errcheck
// Don't connect to this link more than once.
if intf.links.isConnectedTo(intf.info) {
return nil
2019-09-18 15:32:22 +00:00
}
// Mark the connection as in progress.
phony.Block(intf.links, func() {
intf.links._links[intf.info] = nil
})
// When we're done, clean up the connection entry.
defer phony.Block(intf.links, func() {
delete(intf.links._links, intf.info)
})
meta := version_getBaseMetadata()
meta.key = intf.links.core.public
metaBytes := meta.encode()
if err := intf.conn.SetDeadline(time.Now().Add(time.Second * 6)); err != nil {
return fmt.Errorf("failed to set handshake deadline: %w", err)
2019-02-27 03:07:56 +00:00
}
n, err := intf.conn.Write(metaBytes)
switch {
case err != nil:
return fmt.Errorf("write handshake: %w", err)
case err == nil && n != len(metaBytes):
return fmt.Errorf("incomplete handshake send")
}
if _, err = io.ReadFull(intf.conn, metaBytes); err != nil {
return fmt.Errorf("read handshake: %w", err)
}
2022-09-24 16:05:44 +00:00
if err = intf.conn.SetDeadline(time.Time{}); err != nil {
return fmt.Errorf("failed to clear handshake deadline: %w", err)
}
meta = version_metadata{}
2021-05-10 21:31:01 +00:00
base := version_getBaseMetadata()
if !meta.decode(metaBytes) {
return errors.New("failed to decode metadata")
}
2021-05-10 21:31:01 +00:00
if !meta.check() {
var connectError string
if intf.incoming {
connectError = "Rejected incoming connection"
} else {
connectError = "Failed to connect"
}
intf.links.core.log.Debugf("%s: %s is incompatible version (local %s, remote %s)",
connectError,
2021-05-10 21:31:01 +00:00
intf.lname,
fmt.Sprintf("%d.%d", base.ver, base.minorVer),
fmt.Sprintf("%d.%d", meta.ver, meta.minorVer),
)
return errors.New("remote node is incompatible version")
}
// Check if the remote side matches the keys we expected. This is a bit of a weak
// check - in future versions we really should check a signature or something like that.
if pinned := intf.options.pinnedEd25519Keys; len(pinned) > 0 {
2021-05-23 19:33:28 +00:00
var key keyArray
copy(key[:], meta.key)
if _, allowed := pinned[key]; !allowed {
2022-10-22 15:23:25 +00:00
return fmt.Errorf("node public key that does not match pinned keys")
}
}
2019-01-31 23:29:18 +00:00
// Check if we're authorized to connect to this key / IP
2022-07-24 09:23:25 +00:00
allowed := intf.links.core.config._allowedPublicKeys
2021-05-10 21:39:12 +00:00
isallowed := len(allowed) == 0
2022-07-24 09:23:25 +00:00
for k := range allowed {
if bytes.Equal(k[:], meta.key) {
2021-05-10 21:39:12 +00:00
isallowed = true
break
}
}
if intf.incoming && !intf.force && !isallowed {
2022-09-24 16:05:44 +00:00
_ = intf.close()
2022-10-22 15:23:25 +00:00
return fmt.Errorf("node public key %q is not in AllowedPublicKeys", hex.EncodeToString(meta.key))
2019-01-31 23:29:18 +00:00
}
phony.Block(intf.links, func() {
intf.links._links[intf.info] = intf
})
2022-10-22 15:23:25 +00:00
dir := "outbound"
if intf.incoming {
dir = "inbound"
}
remoteAddr := net.IP(address.AddrForKey(meta.key)[:]).String()
remoteStr := fmt.Sprintf("%s@%s", remoteAddr, intf.info.remote)
localStr := intf.conn.LocalAddr()
2022-10-22 15:23:25 +00:00
intf.links.core.log.Infof("Connected %s %s: %s, source %s",
dir, strings.ToUpper(intf.info.linkType), remoteStr, localStr)
err = intf.links.core.HandleConn(meta.key, intf.conn, intf.options.priority)
2022-10-22 13:56:11 +00:00
switch err {
case io.EOF, net.ErrClosed, nil:
2022-10-22 15:23:25 +00:00
intf.links.core.log.Infof("Disconnected %s %s: %s, source %s",
dir, strings.ToUpper(intf.info.linkType), remoteStr, localStr)
2022-10-22 13:56:11 +00:00
default:
2022-10-22 15:23:25 +00:00
intf.links.core.log.Infof("Disconnected %s %s: %s, source %s; error: %s",
dir, strings.ToUpper(intf.info.linkType), remoteStr, localStr, err)
}
if !intf.incoming && dial != nil {
// The connection was one that we dialled, so wait a second and try to
// dial it again.
var retry func(attempt int)
retry = func(attempt int) {
// intf.links.core.log.Infof("Retrying %s (attempt %d of 5)...", dial.url.String(), attempt)
errch := make(chan error, 1)
if _, err := intf.links.call(dial.url, dial.sintf, errch); err != nil {
return
}
if err := <-errch; err != nil {
if attempt < 3 {
time.AfterFunc(time.Second, func() {
retry(attempt + 1)
})
}
}
}
time.AfterFunc(time.Second, func() {
retry(1)
})
}
return nil
}
func (intf *link) close() error {
return intf.conn.Close()
2020-05-16 22:07:47 +00:00
}
func linkInfoFor(linkType, sintf, remote string) linkInfo {
return linkInfo{
linkType: linkType,
local: sintf,
remote: remote,
}
}
type linkConn struct {
// tx and rx are at the beginning of the struct to ensure 64-bit alignment
// on 32-bit platforms, see https://pkg.go.dev/sync/atomic#pkg-note-BUG
rx uint64
tx uint64
up time.Time
net.Conn
}
func (c *linkConn) Read(p []byte) (n int, err error) {
n, err = c.Conn.Read(p)
atomic.AddUint64(&c.rx, uint64(n))
return
}
func (c *linkConn) Write(p []byte) (n int, err error) {
n, err = c.Conn.Write(p)
atomic.AddUint64(&c.tx, uint64(n))
return
}
func linkOptionsForListener(u *url.URL) (l linkOptions) {
if p := u.Query().Get("priority"); p != "" {
if pi, err := strconv.ParseUint(p, 10, 8); err == nil {
l.priority = uint8(pi)
}
}
return
}