diff --git a/cmd/yggdrasil/main.go b/cmd/yggdrasil/main.go index 2486df4..1ef6738 100644 --- a/cmd/yggdrasil/main.go +++ b/cmd/yggdrasil/main.go @@ -28,9 +28,9 @@ import ( "github.com/yggdrasil-network/yggdrasil-go/src/admin" "github.com/yggdrasil-network/yggdrasil-go/src/config" "github.com/yggdrasil-network/yggdrasil-go/src/defaults" + "github.com/yggdrasil-network/yggdrasil-go/src/ipv6rwc" "github.com/yggdrasil-network/yggdrasil-go/src/core" - "github.com/yggdrasil-network/yggdrasil-go/src/ipv6rwc" "github.com/yggdrasil-network/yggdrasil-go/src/multicast" "github.com/yggdrasil-network/yggdrasil-go/src/tuntap" "github.com/yggdrasil-network/yggdrasil-go/src/version" @@ -290,10 +290,7 @@ func run(args yggArgs, ctx context.Context, done chan struct{}) { if err != nil { panic(err) } - options := []core.SetupOption{ - core.IfName(cfg.IfName), - core.IfMTU(cfg.IfMTU), - } + options := []core.SetupOption{} for _, addr := range cfg.Listen { options = append(options, core.ListenAddress(addr)) } @@ -325,7 +322,9 @@ func run(args yggArgs, ctx context.Context, done chan struct{}) { if n.admin, err = admin.New(n.core, logger, options...); err != nil { panic(err) } - n.admin.SetupAdminHandlers() + if n.admin != nil { + n.admin.SetupAdminHandlers() + } } // Setup the multicast module. @@ -342,7 +341,7 @@ func run(args yggArgs, ctx context.Context, done chan struct{}) { if n.multicast, err = multicast.New(n.core, logger, options...); err != nil { panic(err) } - if n.admin != nil { + if n.admin != nil && n.multicast != nil { n.multicast.SetupAdminHandlers(n.admin) } } @@ -356,7 +355,7 @@ func run(args yggArgs, ctx context.Context, done chan struct{}) { if n.tuntap, err = tuntap.New(ipv6rwc.NewReadWriteCloser(n.core), logger, options...); err != nil { panic(err) } - if n.admin != nil { + if n.admin != nil && n.tuntap != nil { n.tuntap.SetupAdminHandlers(n.admin) } } diff --git a/contrib/mobile/mobile.go b/contrib/mobile/mobile.go index 1e6d378..0cf8718 100644 --- a/contrib/mobile/mobile.go +++ b/contrib/mobile/mobile.go @@ -55,10 +55,7 @@ func (m *Yggdrasil) StartJSON(configjson []byte) error { if err != nil { panic(err) } - options := []core.SetupOption{ - core.IfName("none"), - core.IfMTU(m.config.IfMTU), - } + options := []core.SetupOption{} for _, peer := range m.config.Peers { options = append(options, core.Peer{URI: peer}) } diff --git a/src/admin/admin.go b/src/admin/admin.go index d3e95df..4e98c89 100644 --- a/src/admin/admin.go +++ b/src/admin/admin.go @@ -175,6 +175,9 @@ func (a *AdminSocket) IsStarted() bool { // Stop will stop the admin API and close the socket. func (a *AdminSocket) Stop() error { + if a == nil { + return nil + } if a.listener != nil { select { case <-a.done: @@ -321,6 +324,8 @@ type DataUnit uint64 func (d DataUnit) String() string { switch { + case d > 1024*1024*1024*1024: + return fmt.Sprintf("%2.ftb", float64(d)/1024/1024/1024/1024) case d > 1024*1024*1024: return fmt.Sprintf("%2.fgb", float64(d)/1024/1024/1024) case d > 1024*1024: diff --git a/src/core/api.go b/src/core/api.go index 9f56983..657b551 100644 --- a/src/core/api.go +++ b/src/core/api.go @@ -2,6 +2,7 @@ package core import ( "crypto/ed25519" + "fmt" "sync/atomic" "time" @@ -15,6 +16,7 @@ import ( //"sort" //"time" + "github.com/Arceliar/phony" "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/util" //"github.com/yggdrasil-network/yggdrasil-go/src/crypto" @@ -68,11 +70,11 @@ func (c *Core) GetSelf() SelfInfo { func (c *Core) GetPeers() []PeerInfo { var peers []PeerInfo names := make(map[net.Conn]string) - c.links.mutex.Lock() - for _, info := range c.links.links { - names[info.conn] = info.lname - } - c.links.mutex.Unlock() + phony.Block(&c.links, func() { + for _, info := range c.links._links { + names[info.conn] = info.lname + } + }) ps := c.PacketConn.PacketConn.Debug.GetPeers() for _, p := range ps { var info PeerInfo @@ -136,8 +138,17 @@ func (c *Core) GetSessions() []SessionInfo { // Listen starts a new listener (either TCP or TLS). The input should be a url.URL // parsed from a string of the form e.g. "tcp://a.b.c.d:e". In the case of a // link-local address, the interface should be provided as the second argument. -func (c *Core) Listen(u *url.URL, sintf string) (*TcpListener, error) { - return c.links.tcp.listenURL(u, sintf) +func (c *Core) Listen(u *url.URL, sintf string) (*Listener, error) { + switch u.Scheme { + case "tcp": + return c.links.tcp.listen(u, sintf) + case "tls": + return c.links.tls.listen(u, sintf) + case "unix": + return c.links.unix.listen(u, sintf) + default: + return nil, fmt.Errorf("unrecognised scheme %q", u.Scheme) + } } // Address gets the IPv6 address of the Yggdrasil node. This is always a /128 diff --git a/src/core/core.go b/src/core/core.go index 6b9fe54..4cc08ad 100644 --- a/src/core/core.go +++ b/src/core/core.go @@ -40,8 +40,6 @@ type Core struct { _listeners map[ListenAddress]struct{} // configurable after startup nodeinfo NodeInfo // immutable after startup nodeinfoPrivacy NodeInfoPrivacy // immutable after startup - ifname IfName // immutable after startup - ifmtu IfMTU // immutable after startup _allowedPublicKeys map[[32]byte]struct{} // configurable after startup } } @@ -50,6 +48,12 @@ func New(secret ed25519.PrivateKey, logger util.Logger, opts ...SetupOption) (*C c := &Core{ log: logger, } + if name := version.BuildName(); name != "unknown" { + c.log.Infoln("Build name:", name) + } + if version := version.BuildVersion(); version != "unknown" { + c.log.Infoln("Build version:", version) + } c.ctx, c.cancel = context.WithCancel(context.Background()) // Take a copy of the private key so that it is in our own memory space. if len(secret) != ed25519.PrivateKeySize { @@ -78,15 +82,17 @@ func New(secret ed25519.PrivateKey, logger util.Logger, opts ...SetupOption) (*C if err := c.proto.nodeinfo.setNodeInfo(c.config.nodeinfo, bool(c.config.nodeinfoPrivacy)); err != nil { return nil, fmt.Errorf("error setting node info: %w", err) } - c.addPeerTimer = time.AfterFunc(time.Minute, func() { - c.Act(nil, c._addPeerLoop) - }) - if name := version.BuildName(); name != "unknown" { - c.log.Infoln("Build name:", name) - } - if version := version.BuildVersion(); version != "unknown" { - c.log.Infoln("Build version:", version) + for listenaddr := range c.config._listeners { + u, err := url.Parse(string(listenaddr)) + if err != nil { + c.log.Errorf("Invalid listener URI %q specified, ignoring\n", listenaddr) + continue + } + if _, err = c.links.listen(u, ""); err != nil { + c.log.Errorf("Failed to start listener %q: %s\n", listenaddr, err) + } } + c.Act(nil, c._addPeerLoop) return c, nil } @@ -94,10 +100,11 @@ func New(secret ed25519.PrivateKey, logger util.Logger, opts ...SetupOption) (*C // configure them. The loop ensures that disconnected peers will eventually // be reconnected with. func (c *Core) _addPeerLoop() { - if c.addPeerTimer == nil { + select { + case <-c.ctx.Done(): return + default: } - // Add peers from the Peers section for peer := range c.config._peers { go func(peer string, intf string) { @@ -128,12 +135,12 @@ func (c *Core) Stop() { // This function is unsafe and should only be ran by the core actor. func (c *Core) _close() error { c.cancel() + _ = c.links.shutdown() err := c.PacketConn.Close() if c.addPeerTimer != nil { c.addPeerTimer.Stop() c.addPeerTimer = nil } - _ = c.links.stop() return err } diff --git a/src/core/core_test.go b/src/core/core_test.go index ed5b425..8d57f33 100644 --- a/src/core/core_test.go +++ b/src/core/core_test.go @@ -37,10 +37,10 @@ func CreateAndConnectTwo(t testing.TB, verbose bool) (nodeA *Core, nodeB *Core) t.Fatal(err) } logger := GetLoggerWithPrefix("", false) - if nodeA, err = New(skA, logger, ListenAddress("tcp://127.0.0.1:0"), IfName("none")); err != nil { + if nodeA, err = New(skA, logger, ListenAddress("tcp://127.0.0.1:0")); err != nil { t.Fatal(err) } - if nodeB, err = New(skB, logger, ListenAddress("tcp://127.0.0.1:0"), IfName("none")); err != nil { + if nodeB, err = New(skB, logger, ListenAddress("tcp://127.0.0.1:0")); err != nil { t.Fatal(err) } diff --git a/src/core/link.go b/src/core/link.go index 099a8af..08e8dfc 100644 --- a/src/core/link.go +++ b/src/core/link.go @@ -2,7 +2,6 @@ package core import ( "bytes" - "crypto/ed25519" "encoding/hex" "errors" "fmt" @@ -10,7 +9,6 @@ import ( "net" "net/url" "strings" - "sync" //"sync/atomic" "time" @@ -20,22 +18,22 @@ import ( "github.com/Arceliar/phony" "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/util" - "golang.org/x/net/proxy" //"github.com/Arceliar/phony" // TODO? use instead of mutexes ) type links struct { - core *Core - mutex sync.RWMutex // protects links below - links map[linkInfo]*link - tcp tcp // TCP interface support - stopped chan struct{} + phony.Inbox + core *Core + tcp *linkTCP // TCP interface support + tls *linkTLS // TLS interface support + unix *linkUNIX // UNIX interface support + socks *linkSOCKS // SOCKS interface support + _links map[linkInfo]*link // *link is nil if connection in progress // TODO timeout (to remove from switch), read from config.ReadTimeout } // linkInfo is used as a map key type linkInfo struct { - key keyArray linkType string // Type of link, e.g. TCP, AWDL local string // Local name or address remote string // Remote name or address @@ -49,19 +47,30 @@ type link struct { info linkInfo incoming bool force bool - closed chan struct{} } type linkOptions struct { pinnedEd25519Keys map[keyArray]struct{} } +type Listener struct { + net.Listener + closed chan struct{} +} + +func (l *Listener) Close() error { + err := l.Listener.Close() + <-l.closed + return err +} + func (l *links) init(c *Core) error { l.core = c - l.mutex.Lock() - l.links = make(map[linkInfo]*link) - l.mutex.Unlock() - l.stopped = make(chan struct{}) + l.tcp = l.newLinkTCP() + l.tls = l.newLinkTLS(l.tcp) + l.unix = l.newLinkUNIX() + l.socks = l.newLinkSOCKS() + l._links = make(map[linkInfo]*link) var listeners []ListenAddress phony.Block(c, func() { @@ -70,96 +79,160 @@ func (l *links) init(c *Core) error { listeners = append(listeners, listener) } }) - if err := l.tcp.init(l, listeners); err != nil { - c.log.Errorln("Failed to start TCP interface") - return err - } return nil } +func (l *links) shutdown() error { + phony.Block(l.tcp, func() { + for l := range l.tcp._listeners { + l.Close() + } + }) + phony.Block(l.tls, func() { + for l := range l.tls._listeners { + l.Close() + } + }) + phony.Block(l.unix, func() { + for l := range l.unix._listeners { + l.Close() + } + }) + return nil +} + +func (l *links) isConnectedTo(info linkInfo) bool { + var isConnected bool + phony.Block(l, func() { + _, isConnected = l._links[info] + }) + return isConnected +} + func (l *links) call(u *url.URL, sintf string) error { - tcpOpts := tcpOptions{} - if pubkeys, ok := u.Query()["key"]; ok && len(pubkeys) > 0 { - tcpOpts.pinnedEd25519Keys = make(map[keyArray]struct{}) - for _, pubkey := range pubkeys { - if sigPub, err := hex.DecodeString(pubkey); err == nil { - var sigPubKey keyArray - copy(sigPubKey[:], sigPub) - tcpOpts.pinnedEd25519Keys[sigPubKey] = struct{}{} - } + info := linkInfoFor(u.Scheme, sintf, u.Host) + if l.isConnectedTo(info) { + return fmt.Errorf("already connected to this node") + } + options := linkOptions{ + pinnedEd25519Keys: map[keyArray]struct{}{}, + } + for _, pubkey := range u.Query()["key"] { + if sigPub, err := hex.DecodeString(pubkey); err == nil { + var sigPubKey keyArray + copy(sigPubKey[:], sigPub) + options.pinnedEd25519Keys[sigPubKey] = struct{}{} } } - switch u.Scheme { + switch info.linkType { case "tcp": - l.tcp.call(u.Host, tcpOpts, sintf) + go func() { + if err := l.tcp.dial(u, options, sintf); err != nil { + l.core.log.Warnf("Failed to dial TCP %s: %s\n", u.Host, err) + } + }() + case "socks": - tcpOpts.socksProxyAddr = u.Host - if u.User != nil { - tcpOpts.socksProxyAuth = &proxy.Auth{} - tcpOpts.socksProxyAuth.User = u.User.Username() - tcpOpts.socksProxyAuth.Password, _ = u.User.Password() - } - tcpOpts.upgrade = l.tcp.tls.forDialer // TODO make this configurable - pathtokens := strings.Split(strings.Trim(u.Path, "/"), "/") - l.tcp.call(pathtokens[0], tcpOpts, sintf) + go func() { + if err := l.socks.dial(u, options); err != nil { + l.core.log.Warnf("Failed to dial SOCKS %s: %s\n", u.Host, err) + } + }() + case "tls": - tcpOpts.upgrade = l.tcp.tls.forDialer // SNI headers must contain hostnames and not IP addresses, so we must make sure // that we do not populate the SNI with an IP literal. We do this by splitting // the host-port combo from the query option and then seeing if it parses to an // IP address successfully or not. + var tlsSNI string if sni := u.Query().Get("sni"); sni != "" { if net.ParseIP(sni) == nil { - tcpOpts.tlsSNI = sni + tlsSNI = sni } } // If the SNI is not configured still because the above failed then we'll try // again but this time we'll use the host part of the peering URI instead. - if tcpOpts.tlsSNI == "" { + if tlsSNI == "" { if host, _, err := net.SplitHostPort(u.Host); err == nil && net.ParseIP(host) == nil { - tcpOpts.tlsSNI = host + tlsSNI = host } } - l.tcp.call(u.Host, tcpOpts, sintf) + go func() { + if err := l.tls.dial(u, options, sintf, tlsSNI); err != nil { + l.core.log.Warnf("Failed to dial TLS %s: %s\n", u.Host, err) + } + }() + + case "unix": + go func() { + if err := l.unix.dial(u, options, sintf); err != nil { + l.core.log.Warnf("Failed to dial UNIX %s: %s\n", u.Host, err) + } + }() + default: return errors.New("unknown call scheme: " + u.Scheme) } return nil } -func (l *links) create(conn net.Conn, name, linkType, local, remote string, incoming, force bool, options linkOptions) (*link, error) { - // Technically anything unique would work for names, but let's pick something human readable, just for debugging +func (l *links) listen(u *url.URL, sintf string) (*Listener, error) { + var listener *Listener + var err error + switch u.Scheme { + case "tcp": + listener, err = l.tcp.listen(u, sintf) + case "tls": + listener, err = l.tls.listen(u, sintf) + case "unix": + listener, err = l.unix.listen(u, sintf) + default: + return nil, fmt.Errorf("unrecognised scheme %q", u.Scheme) + } + return listener, err +} + +func (l *links) create(conn net.Conn, name string, info linkInfo, incoming, force bool, options linkOptions) error { intf := link{ conn: &linkConn{ Conn: conn, up: time.Now(), }, - lname: name, - links: l, - options: options, - info: linkInfo{ - linkType: linkType, - local: local, - remote: remote, - }, + lname: name, + links: l, + options: options, + info: info, incoming: incoming, force: force, } - return &intf, nil -} - -func (l *links) stop() error { - close(l.stopped) - if err := l.tcp.stop(); err != nil { - return err - } + go func() { + if err := intf.handler(); err != nil { + l.core.log.Errorf("Link handler %s error (%s): %s", name, conn.RemoteAddr(), err) + } + }() return nil } -func (intf *link) handler() (chan struct{}, error) { - // TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later +func (intf *link) handler() error { defer intf.conn.Close() + + // Don't connect to this link more than once. + if intf.links.isConnectedTo(intf.info) { + return fmt.Errorf("already connected to this node") + } + + // Mark the connection as in progress. + phony.Block(intf.links, func() { + intf.links._links[intf.info] = nil + }) + + // When we're done, clean up the connection entry. + defer phony.Block(intf.links, func() { + delete(intf.links._links, intf.info) + }) + + // TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later meta := version_getBaseMetadata() meta.key = intf.links.core.public metaBytes := meta.encode() @@ -172,10 +245,10 @@ func (intf *link) handler() (chan struct{}, error) { err = errors.New("incomplete metadata send") } }) { - return nil, errors.New("timeout on metadata send") + return errors.New("timeout on metadata send") } if err != nil { - return nil, err + return fmt.Errorf("write handshake: %w", err) } if !util.FuncTimeout(30*time.Second, func() { var n int @@ -184,15 +257,15 @@ func (intf *link) handler() (chan struct{}, error) { err = errors.New("incomplete metadata recv") } }) { - return nil, errors.New("timeout on metadata recv") + return errors.New("timeout on metadata recv") } if err != nil { - return nil, err + return fmt.Errorf("read handshake: %w", err) } meta = version_metadata{} base := version_getBaseMetadata() if !meta.decode(metaBytes) { - return nil, errors.New("failed to decode metadata") + return errors.New("failed to decode metadata") } if !meta.check() { var connectError string @@ -207,16 +280,16 @@ func (intf *link) handler() (chan struct{}, error) { fmt.Sprintf("%d.%d", base.ver, base.minorVer), fmt.Sprintf("%d.%d", meta.ver, meta.minorVer), ) - return nil, errors.New("remote node is incompatible version") + return errors.New("remote node is incompatible version") } // Check if the remote side matches the keys we expected. This is a bit of a weak // check - in future versions we really should check a signature or something like that. - if pinned := intf.options.pinnedEd25519Keys; pinned != nil { + if pinned := intf.options.pinnedEd25519Keys; len(pinned) > 0 { var key keyArray copy(key[:], meta.key) if _, allowed := pinned[key]; !allowed { intf.links.core.log.Errorf("Failed to connect to node: %q sent ed25519 key that does not match pinned keys", intf.name()) - return nil, fmt.Errorf("failed to connect: host sent ed25519 key that does not match pinned keys") + return fmt.Errorf("failed to connect: host sent ed25519 key that does not match pinned keys") } } // Check if we're authorized to connect to this key / IP @@ -232,55 +305,50 @@ func (intf *link) handler() (chan struct{}, error) { intf.links.core.log.Warnf("%s connection from %s forbidden: AllowedEncryptionPublicKeys does not contain key %s", strings.ToUpper(intf.info.linkType), intf.info.remote, hex.EncodeToString(meta.key)) intf.close() - return nil, nil + return fmt.Errorf("forbidden connection") } - // Check if we already have a link to this node - copy(intf.info.key[:], meta.key) - intf.links.mutex.Lock() - if oldIntf, isIn := intf.links.links[intf.info]; isIn { - intf.links.mutex.Unlock() - // FIXME we should really return an error and let the caller block instead - // That lets them do things like close connections on its own, avoid printing a connection message in the first place, etc. - intf.links.core.log.Debugln("DEBUG: found existing interface for", intf.name()) - return oldIntf.closed, nil - } else { - intf.closed = make(chan struct{}) - intf.links.links[intf.info] = intf - defer func() { - intf.links.mutex.Lock() - delete(intf.links.links, intf.info) - intf.links.mutex.Unlock() - close(intf.closed) - }() - intf.links.core.log.Debugln("DEBUG: registered interface for", intf.name()) - } - intf.links.mutex.Unlock() - themAddr := address.AddrForKey(ed25519.PublicKey(intf.info.key[:])) - themAddrString := net.IP(themAddr[:]).String() - themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote) + + phony.Block(intf.links, func() { + intf.links._links[intf.info] = intf + }) + + remoteAddr := net.IP(address.AddrForKey(meta.key)[:]).String() + remoteStr := fmt.Sprintf("%s@%s", remoteAddr, intf.info.remote) + localStr := intf.conn.LocalAddr() intf.links.core.log.Infof("Connected %s: %s, source %s", - strings.ToUpper(intf.info.linkType), themString, intf.info.local) - // Run the handler - err = intf.links.core.HandleConn(ed25519.PublicKey(intf.info.key[:]), intf.conn) + strings.ToUpper(intf.info.linkType), remoteStr, localStr) + // TODO don't report an error if it's just a 'use of closed network connection' - if err != nil { + if err = intf.links.core.HandleConn(meta.key, intf.conn); err != nil && err != io.EOF { intf.links.core.log.Infof("Disconnected %s: %s, source %s; error: %s", - strings.ToUpper(intf.info.linkType), themString, intf.info.local, err) + strings.ToUpper(intf.info.linkType), remoteStr, localStr, err) } else { intf.links.core.log.Infof("Disconnected %s: %s, source %s", - strings.ToUpper(intf.info.linkType), themString, intf.info.local) + strings.ToUpper(intf.info.linkType), remoteStr, localStr) } - return nil, err + + return nil } -func (intf *link) close() { - intf.conn.Close() +func (intf *link) close() error { + return intf.conn.Close() } func (intf *link) name() string { return intf.lname } +func linkInfoFor(linkType, sintf, remote string) linkInfo { + if h, _, err := net.SplitHostPort(remote); err == nil { + remote = h + } + return linkInfo{ + linkType: linkType, + local: sintf, + remote: remote, + } +} + type linkConn struct { // tx and rx are at the beginning of the struct to ensure 64-bit alignment // on 32-bit platforms, see https://pkg.go.dev/sync/atomic#pkg-note-BUG diff --git a/src/core/link_socks.go b/src/core/link_socks.go new file mode 100644 index 0000000..ad5b8c9 --- /dev/null +++ b/src/core/link_socks.go @@ -0,0 +1,52 @@ +package core + +import ( + "fmt" + "net" + "net/url" + "strings" + + "golang.org/x/net/proxy" +) + +type linkSOCKS struct { + *links +} + +func (l *links) newLinkSOCKS() *linkSOCKS { + lt := &linkSOCKS{ + links: l, + } + return lt +} + +func (l *linkSOCKS) dial(url *url.URL, options linkOptions) error { + info := linkInfoFor("socks", "", url.Path) + if l.links.isConnectedTo(info) { + return fmt.Errorf("duplicate connection attempt") + } + proxyAuth := &proxy.Auth{} + proxyAuth.User = url.User.Username() + proxyAuth.Password, _ = url.User.Password() + dialer, err := proxy.SOCKS5("tcp", url.Host, proxyAuth, proxy.Direct) + if err != nil { + return fmt.Errorf("failed to configure proxy") + } + pathtokens := strings.Split(strings.Trim(url.Path, "/"), "/") + conn, err := dialer.Dial("tcp", pathtokens[0]) + if err != nil { + return err + } + return l.handler(url.String(), info, conn, options, false) +} + +func (l *linkSOCKS) handler(name string, info linkInfo, conn net.Conn, options linkOptions, incoming bool) error { + return l.links.create( + conn, // connection + name, // connection name + info, // connection info + incoming, // not incoming + false, // not forced + options, // connection options + ) +} diff --git a/src/core/link_tcp.go b/src/core/link_tcp.go new file mode 100644 index 0000000..fcac8b4 --- /dev/null +++ b/src/core/link_tcp.go @@ -0,0 +1,183 @@ +package core + +import ( + "context" + "fmt" + "net" + "net/url" + "strings" + "time" + + "github.com/Arceliar/phony" +) + +type linkTCP struct { + phony.Inbox + *links + listener *net.ListenConfig + _listeners map[*Listener]context.CancelFunc +} + +func (l *links) newLinkTCP() *linkTCP { + lt := &linkTCP{ + links: l, + listener: &net.ListenConfig{ + KeepAlive: -1, + }, + _listeners: map[*Listener]context.CancelFunc{}, + } + lt.listener.Control = lt.tcpContext + return lt +} + +func (l *linkTCP) dial(url *url.URL, options linkOptions, sintf string) error { + info := linkInfoFor("tcp", sintf, strings.SplitN(url.Host, "%", 2)[0]) + if l.links.isConnectedTo(info) { + return fmt.Errorf("duplicate connection attempt") + } + addr, err := net.ResolveTCPAddr("tcp", url.Host) + if err != nil { + return err + } + addr.Zone = sintf + dialer, err := l.dialerFor(addr.String(), sintf) + if err != nil { + return err + } + conn, err := dialer.DialContext(l.core.ctx, "tcp", addr.String()) + if err != nil { + return err + } + return l.handler(url.String(), info, conn, options, false) +} + +func (l *linkTCP) listen(url *url.URL, sintf string) (*Listener, error) { + ctx, cancel := context.WithCancel(l.core.ctx) + hostport := url.Host + if sintf != "" { + if host, port, err := net.SplitHostPort(hostport); err == nil { + hostport = fmt.Sprintf("[%s%%%s]:%s", host, sintf, port) + } + } + listener, err := l.listener.Listen(ctx, "tcp", hostport) + if err != nil { + cancel() + return nil, err + } + entry := &Listener{ + Listener: listener, + closed: make(chan struct{}), + } + phony.Block(l, func() { + l._listeners[entry] = cancel + }) + l.core.log.Printf("TCP listener started on %s", listener.Addr()) + go func() { + defer phony.Block(l, func() { + delete(l._listeners, entry) + }) + for { + conn, err := listener.Accept() + if err != nil { + cancel() + break + } + addr := conn.RemoteAddr().(*net.TCPAddr) + name := fmt.Sprintf("tls://%s", addr) + info := linkInfoFor("tcp", sintf, strings.SplitN(addr.IP.String(), "%", 2)[0]) + if err = l.handler(name, info, conn, linkOptions{}, true); err != nil { + l.core.log.Errorln("Failed to create inbound link:", err) + } + } + listener.Close() + close(entry.closed) + l.core.log.Printf("TCP listener stopped on %s", listener.Addr()) + }() + return entry, nil +} + +func (l *linkTCP) handler(name string, info linkInfo, conn net.Conn, options linkOptions, incoming bool) error { + return l.links.create( + conn, // connection + name, // connection name + info, // connection info + incoming, // not incoming + false, // not forced + options, // connection options + ) +} + +// Returns the address of the listener. +func (l *linkTCP) getAddr() *net.TCPAddr { + // TODO: Fix this, because this will currently only give a single address + // to multicast.go, which obviously is not great, but right now multicast.go + // doesn't have the ability to send more than one address in a packet either + var addr *net.TCPAddr + phony.Block(l, func() { + for listener := range l._listeners { + addr = listener.Addr().(*net.TCPAddr) + } + }) + return addr +} + +func (l *linkTCP) dialerFor(saddr, sintf string) (*net.Dialer, error) { + dst, err := net.ResolveTCPAddr("tcp", saddr) + if err != nil { + return nil, err + } + if dst.IP.IsLinkLocalUnicast() { + dst.Zone = sintf + if dst.Zone == "" { + return nil, fmt.Errorf("link-local address requires a zone") + } + } + dialer := &net.Dialer{ + Timeout: time.Second * 5, + KeepAlive: -1, + Control: l.tcpContext, + } + if sintf != "" { + dialer.Control = l.getControl(sintf) + ief, err := net.InterfaceByName(sintf) + if err != nil { + return nil, fmt.Errorf("interface %q not found", sintf) + } + if ief.Flags&net.FlagUp == 0 { + return nil, fmt.Errorf("interface %q is not up", sintf) + } + addrs, err := ief.Addrs() + if err != nil { + return nil, fmt.Errorf("interface %q addresses not available: %w", sintf, err) + } + for addrindex, addr := range addrs { + src, _, err := net.ParseCIDR(addr.String()) + if err != nil { + continue + } + if !src.IsGlobalUnicast() && !src.IsLinkLocalUnicast() { + continue + } + bothglobal := src.IsGlobalUnicast() == dst.IP.IsGlobalUnicast() + bothlinklocal := src.IsLinkLocalUnicast() == dst.IP.IsLinkLocalUnicast() + if !bothglobal && !bothlinklocal { + continue + } + if (src.To4() != nil) != (dst.IP.To4() != nil) { + continue + } + if bothglobal || bothlinklocal || addrindex == len(addrs)-1 { + dialer.LocalAddr = &net.TCPAddr{ + IP: src, + Port: 0, + Zone: sintf, + } + break + } + } + if dialer.LocalAddr == nil { + return nil, fmt.Errorf("no suitable source address found on interface %q", sintf) + } + } + return dialer, nil +} diff --git a/src/core/tcp_darwin.go b/src/core/link_tcp_darwin.go similarity index 74% rename from src/core/tcp_darwin.go rename to src/core/link_tcp_darwin.go index 2ea3abc..daa51df 100644 --- a/src/core/tcp_darwin.go +++ b/src/core/link_tcp_darwin.go @@ -11,7 +11,7 @@ import ( // WARNING: This context is used both by net.Dialer and net.Listen in tcp.go -func (t *tcp) tcpContext(network, address string, c syscall.RawConn) error { +func (t *linkTCP) tcpContext(network, address string, c syscall.RawConn) error { var control error var recvanyif error @@ -28,6 +28,6 @@ func (t *tcp) tcpContext(network, address string, c syscall.RawConn) error { } } -func (t *tcp) getControl(sintf string) func(string, string, syscall.RawConn) error { +func (t *linkTCP) getControl(sintf string) func(string, string, syscall.RawConn) error { return t.tcpContext } diff --git a/src/core/tcp_linux.go b/src/core/link_tcp_linux.go similarity index 86% rename from src/core/tcp_linux.go rename to src/core/link_tcp_linux.go index e59c312..9e875fe 100644 --- a/src/core/tcp_linux.go +++ b/src/core/link_tcp_linux.go @@ -11,7 +11,7 @@ import ( // WARNING: This context is used both by net.Dialer and net.Listen in tcp.go -func (t *tcp) tcpContext(network, address string, c syscall.RawConn) error { +func (t *linkTCP) tcpContext(network, address string, c syscall.RawConn) error { var control error var bbr error @@ -31,7 +31,7 @@ func (t *tcp) tcpContext(network, address string, c syscall.RawConn) error { return nil } -func (t *tcp) getControl(sintf string) func(string, string, syscall.RawConn) error { +func (t *linkTCP) getControl(sintf string) func(string, string, syscall.RawConn) error { return func(network, address string, c syscall.RawConn) error { var err error btd := func(fd uintptr) { diff --git a/src/core/tcp_other.go b/src/core/link_tcp_other.go similarity index 55% rename from src/core/tcp_other.go rename to src/core/link_tcp_other.go index 8dd76f2..f8a5aec 100644 --- a/src/core/tcp_other.go +++ b/src/core/link_tcp_other.go @@ -9,10 +9,10 @@ import ( // WARNING: This context is used both by net.Dialer and net.Listen in tcp.go -func (t *tcp) tcpContext(network, address string, c syscall.RawConn) error { +func (t *linkTCP) tcpContext(network, address string, c syscall.RawConn) error { return nil } -func (t *tcp) getControl(sintf string) func(string, string, syscall.RawConn) error { +func (t *linkTCP) getControl(sintf string) func(string, string, syscall.RawConn) error { return t.tcpContext } diff --git a/src/core/link_tls.go b/src/core/link_tls.go new file mode 100644 index 0000000..e6ffd5a --- /dev/null +++ b/src/core/link_tls.go @@ -0,0 +1,171 @@ +package core + +import ( + "bytes" + "context" + "crypto/rand" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/hex" + "encoding/pem" + "fmt" + "math/big" + "net" + "net/url" + "strings" + "time" + + "github.com/Arceliar/phony" +) + +type linkTLS struct { + phony.Inbox + *links + tcp *linkTCP + listener *net.ListenConfig + config *tls.Config + _listeners map[*Listener]context.CancelFunc +} + +func (l *links) newLinkTLS(tcp *linkTCP) *linkTLS { + lt := &linkTLS{ + links: l, + tcp: tcp, + listener: &net.ListenConfig{ + Control: tcp.tcpContext, + KeepAlive: -1, + }, + _listeners: map[*Listener]context.CancelFunc{}, + } + var err error + lt.config, err = lt.generateConfig() + if err != nil { + panic(err) + } + return lt +} + +func (l *linkTLS) dial(url *url.URL, options linkOptions, sintf, sni string) error { + info := linkInfoFor("tls", sintf, strings.SplitN(url.Host, "%", 2)[0]) + if l.links.isConnectedTo(info) { + return fmt.Errorf("duplicate connection attempt") + } + addr, err := net.ResolveTCPAddr("tcp", url.Host) + if err != nil { + return err + } + addr.Zone = sintf + dialer, err := l.tcp.dialerFor(addr.String(), sintf) + if err != nil { + return err + } + tlsconfig := l.config.Clone() + tlsconfig.ServerName = sni + tlsdialer := &tls.Dialer{ + NetDialer: dialer, + Config: tlsconfig, + } + conn, err := tlsdialer.DialContext(l.core.ctx, "tcp", addr.String()) + if err != nil { + return err + } + return l.handler(url.String(), info, conn, options, false) +} + +func (l *linkTLS) listen(url *url.URL, sintf string) (*Listener, error) { + ctx, cancel := context.WithCancel(l.core.ctx) + hostport := url.Host + if sintf != "" { + if host, port, err := net.SplitHostPort(hostport); err == nil { + hostport = fmt.Sprintf("[%s%%%s]:%s", host, sintf, port) + } + } + listener, err := l.listener.Listen(ctx, "tcp", hostport) + if err != nil { + cancel() + return nil, err + } + tlslistener := tls.NewListener(listener, l.config) + entry := &Listener{ + Listener: tlslistener, + closed: make(chan struct{}), + } + phony.Block(l, func() { + l._listeners[entry] = cancel + }) + l.core.log.Printf("TLS listener started on %s", listener.Addr()) + go func() { + defer phony.Block(l, func() { + delete(l._listeners, entry) + }) + for { + conn, err := tlslistener.Accept() + if err != nil { + cancel() + break + } + addr := conn.RemoteAddr().(*net.TCPAddr) + name := fmt.Sprintf("tls://%s", addr) + info := linkInfoFor("tls", sintf, strings.SplitN(addr.IP.String(), "%", 2)[0]) + if err = l.handler(name, info, conn, linkOptions{}, true); err != nil { + l.core.log.Errorln("Failed to create inbound link:", err) + } + } + tlslistener.Close() + close(entry.closed) + l.core.log.Printf("TLS listener stopped on %s", listener.Addr()) + }() + return entry, nil +} + +func (l *linkTLS) generateConfig() (*tls.Config, error) { + certBuf := &bytes.Buffer{} + + // TODO: because NotAfter is finite, we should add some mechanism to + // regenerate the certificate and restart the listeners periodically + // for nodes with very high uptimes. Perhaps regenerate certs and restart + // listeners every few months or so. + cert := x509.Certificate{ + SerialNumber: big.NewInt(1), + Subject: pkix.Name{ + CommonName: hex.EncodeToString(l.links.core.public[:]), + }, + NotBefore: time.Now(), + NotAfter: time.Now().Add(time.Hour * 24 * 365), + KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + BasicConstraintsValid: true, + } + + certbytes, err := x509.CreateCertificate(rand.Reader, &cert, &cert, l.links.core.public, l.links.core.secret) + if err != nil { + return nil, err + } + + if err := pem.Encode(certBuf, &pem.Block{ + Type: "CERTIFICATE", + Bytes: certbytes, + }); err != nil { + return nil, err + } + + rootCAs := x509.NewCertPool() + rootCAs.AppendCertsFromPEM(certbytes) + + return &tls.Config{ + RootCAs: rootCAs, + Certificates: []tls.Certificate{ + { + Certificate: [][]byte{certbytes}, + PrivateKey: l.links.core.secret, + }, + }, + InsecureSkipVerify: true, + MinVersion: tls.VersionTLS13, + }, nil +} + +func (l *linkTLS) handler(name string, info linkInfo, conn net.Conn, options linkOptions, incoming bool) error { + return l.tcp.handler(name, info, conn, options, incoming) +} diff --git a/src/core/link_unix.go b/src/core/link_unix.go new file mode 100644 index 0000000..d63c1d9 --- /dev/null +++ b/src/core/link_unix.go @@ -0,0 +1,98 @@ +package core + +import ( + "context" + "fmt" + "net" + "net/url" + "time" + + "github.com/Arceliar/phony" +) + +type linkUNIX struct { + phony.Inbox + *links + dialer *net.Dialer + listener *net.ListenConfig + _listeners map[*Listener]context.CancelFunc +} + +func (l *links) newLinkUNIX() *linkUNIX { + lt := &linkUNIX{ + links: l, + dialer: &net.Dialer{ + Timeout: time.Second * 5, + KeepAlive: -1, + }, + listener: &net.ListenConfig{ + KeepAlive: -1, + }, + _listeners: map[*Listener]context.CancelFunc{}, + } + return lt +} + +func (l *linkUNIX) dial(url *url.URL, options linkOptions, _ string) error { + info := linkInfoFor("unix", "", url.Path) + if l.links.isConnectedTo(info) { + return fmt.Errorf("duplicate connection attempt") + } + addr, err := net.ResolveUnixAddr("unix", url.Path) + if err != nil { + return err + } + conn, err := l.dialer.DialContext(l.core.ctx, "unix", addr.String()) + if err != nil { + return err + } + return l.handler(url.String(), info, conn, options, false) +} + +func (l *linkUNIX) listen(url *url.URL, _ string) (*Listener, error) { + ctx, cancel := context.WithCancel(l.core.ctx) + listener, err := l.listener.Listen(ctx, "unix", url.Path) + if err != nil { + cancel() + return nil, err + } + entry := &Listener{ + Listener: listener, + closed: make(chan struct{}), + } + phony.Block(l, func() { + l._listeners[entry] = cancel + }) + l.core.log.Printf("UNIX listener started on %s", listener.Addr()) + go func() { + defer phony.Block(l, func() { + delete(l._listeners, entry) + }) + for { + conn, err := listener.Accept() + if err != nil { + cancel() + break + } + info := linkInfoFor("unix", "", url.String()) + if err = l.handler(url.String(), info, conn, linkOptions{}, true); err != nil { + l.core.log.Errorln("Failed to create inbound link:", err) + } + } + listener.Close() + close(entry.closed) + l.core.log.Printf("UNIX listener stopped on %s", listener.Addr()) + }() + return entry, nil +} + +func (l *linkUNIX) handler(name string, info linkInfo, conn net.Conn, options linkOptions, incoming bool) error { + return l.links.create( + conn, // connection + name, // connection name + info, // connection info + incoming, // not incoming + false, // not forced + options, // connection options + ) +} diff --git a/src/core/options.go b/src/core/options.go index b3d06c6..8009aa3 100644 --- a/src/core/options.go +++ b/src/core/options.go @@ -14,10 +14,6 @@ func (c *Core) _applyOption(opt SetupOption) { c.config.nodeinfo = v case NodeInfoPrivacy: c.config.nodeinfoPrivacy = v - case IfName: - c.config.ifname = v - case IfMTU: - c.config.ifmtu = v case AllowedPublicKey: pk := [32]byte{} copy(pk[:], v) @@ -36,14 +32,10 @@ type Peer struct { } type NodeInfo map[string]interface{} type NodeInfoPrivacy bool -type IfName string -type IfMTU uint16 type AllowedPublicKey ed25519.PublicKey func (a ListenAddress) isSetupOption() {} func (a Peer) isSetupOption() {} func (a NodeInfo) isSetupOption() {} func (a NodeInfoPrivacy) isSetupOption() {} -func (a IfName) isSetupOption() {} -func (a IfMTU) isSetupOption() {} func (a AllowedPublicKey) isSetupOption() {} diff --git a/src/core/tcp.go b/src/core/tcp.go deleted file mode 100644 index ab95280..0000000 --- a/src/core/tcp.go +++ /dev/null @@ -1,417 +0,0 @@ -package core - -// This sends packets to peers using TCP as a transport -// It's generally better tested than the UDP implementation -// Using it regularly is insane, but I find TCP easier to test/debug with it -// Updating and optimizing the UDP version is a higher priority - -// TODO: -// Something needs to make sure we're getting *valid* packets -// Could be used to DoS (connect, give someone else's keys, spew garbage) -// I guess the "peer" part should watch for link packets, disconnect? - -// TCP connections start with a metadata exchange. -// It involves exchanging version numbers and crypto keys -// See version.go for version metadata format - -import ( - "context" - "fmt" - "math/rand" - "net" - "net/url" - "strings" - "sync" - "time" - - "golang.org/x/net/proxy" - - "github.com/yggdrasil-network/yggdrasil-go/src/address" - //"github.com/yggdrasil-network/yggdrasil-go/src/util" -) - -const default_timeout = 6 * time.Second - -// The TCP listener and information about active TCP connections, to avoid duplication. -type tcp struct { - links *links - waitgroup sync.WaitGroup - mutex sync.Mutex // Protecting the below - listeners map[string]*TcpListener - calls map[string]struct{} - conns map[linkInfo](chan struct{}) - tls tcptls -} - -// TcpListener is a stoppable TCP listener interface. These are typically -// returned from calls to the ListenTCP() function and are also used internally -// to represent listeners created by the "Listen" configuration option and for -// multicast interfaces. -type TcpListener struct { - Listener net.Listener - opts tcpOptions - stop chan struct{} -} - -type TcpUpgrade struct { - upgrade func(c net.Conn, o *tcpOptions) (net.Conn, error) - name string -} - -type tcpOptions struct { - linkOptions - upgrade *TcpUpgrade - socksProxyAddr string - socksProxyAuth *proxy.Auth - socksPeerAddr string - tlsSNI string -} - -func (l *TcpListener) Stop() { - defer func() { _ = recover() }() - close(l.stop) -} - -// Wrapper function to set additional options for specific connection types. -func (t *tcp) setExtraOptions(c net.Conn) { - switch sock := c.(type) { - case *net.TCPConn: - _ = sock.SetNoDelay(true) - // TODO something for socks5 - default: - } -} - -// Returns the address of the listener. -func (t *tcp) getAddr() *net.TCPAddr { - // TODO: Fix this, because this will currently only give a single address - // to multicast.go, which obviously is not great, but right now multicast.go - // doesn't have the ability to send more than one address in a packet either - t.mutex.Lock() - defer t.mutex.Unlock() - for _, l := range t.listeners { - return l.Listener.Addr().(*net.TCPAddr) - } - return nil -} - -// Initializes the struct. -func (t *tcp) init(l *links, listeners []ListenAddress) error { - t.links = l - t.tls.init(t) - t.mutex.Lock() - t.calls = make(map[string]struct{}) - t.conns = make(map[linkInfo](chan struct{})) - t.listeners = make(map[string]*TcpListener) - t.mutex.Unlock() - - for _, listenaddr := range listeners { - u, err := url.Parse(string(listenaddr)) - if err != nil { - t.links.core.log.Errorln("Failed to parse listener: listener", listenaddr, "is not correctly formatted, ignoring") - continue - } - if _, err := t.listenURL(u, ""); err != nil { - return err - } - } - - return nil -} - -func (t *tcp) stop() error { - t.mutex.Lock() - for _, listener := range t.listeners { - listener.Stop() - } - t.mutex.Unlock() - t.waitgroup.Wait() - return nil -} - -func (t *tcp) listenURL(u *url.URL, sintf string) (*TcpListener, error) { - var listener *TcpListener - var err error - hostport := u.Host // Used for tcp and tls - if len(sintf) != 0 { - host, port, err := net.SplitHostPort(hostport) - if err == nil { - hostport = fmt.Sprintf("[%s%%%s]:%s", host, sintf, port) - } - } - switch u.Scheme { - case "tcp": - listener, err = t.listen(hostport, nil) - case "tls": - listener, err = t.listen(hostport, t.tls.forListener) - default: - t.links.core.log.Errorln("Failed to add listener: listener", u.String(), "is not correctly formatted, ignoring") - } - return listener, err -} - -func (t *tcp) listen(listenaddr string, upgrade *TcpUpgrade) (*TcpListener, error) { - var err error - - ctx := t.links.core.ctx - lc := net.ListenConfig{ - Control: t.tcpContext, - } - listener, err := lc.Listen(ctx, "tcp", listenaddr) - if err == nil { - l := TcpListener{ - Listener: listener, - opts: tcpOptions{upgrade: upgrade}, - stop: make(chan struct{}), - } - t.waitgroup.Add(1) - go t.listener(&l, listenaddr) - return &l, nil - } - - return nil, err -} - -// Runs the listener, which spawns off goroutines for incoming connections. -func (t *tcp) listener(l *TcpListener, listenaddr string) { - defer t.waitgroup.Done() - if l == nil { - return - } - // Track the listener so that we can find it again in future - t.mutex.Lock() - if _, isIn := t.listeners[listenaddr]; isIn { - t.mutex.Unlock() - l.Listener.Close() - return - } - callproto := "TCP" - if l.opts.upgrade != nil { - callproto = strings.ToUpper(l.opts.upgrade.name) - } - t.listeners[listenaddr] = l - t.mutex.Unlock() - // And here we go! - defer func() { - t.links.core.log.Infoln("Stopping", callproto, "listener on:", l.Listener.Addr().String()) - l.Listener.Close() - t.mutex.Lock() - delete(t.listeners, listenaddr) - t.mutex.Unlock() - }() - t.links.core.log.Infoln("Listening for", callproto, "on:", l.Listener.Addr().String()) - go func() { - <-l.stop - l.Listener.Close() - }() - defer l.Stop() - for { - sock, err := l.Listener.Accept() - if err != nil { - t.links.core.log.Errorln("Failed to accept connection:", err) - select { - case <-l.stop: - return - default: - } - time.Sleep(time.Second) // So we don't busy loop - continue - } - t.waitgroup.Add(1) - options := l.opts - go t.handler(sock, true, options) - } -} - -// Checks if we already are calling this address -func (t *tcp) startCalling(saddr string) bool { - t.mutex.Lock() - defer t.mutex.Unlock() - _, isIn := t.calls[saddr] - t.calls[saddr] = struct{}{} - return !isIn -} - -// Checks if a connection already exists. -// If not, it adds it to the list of active outgoing calls (to block future attempts) and dials the address. -// If the dial is successful, it launches the handler. -// When finished, it removes the outgoing call, so reconnection attempts can be made later. -// This all happens in a separate goroutine that it spawns. -func (t *tcp) call(saddr string, options tcpOptions, sintf string) { - go func() { - callname := saddr - callproto := "TCP" - if options.upgrade != nil { - callproto = strings.ToUpper(options.upgrade.name) - } - if sintf != "" { - callname = fmt.Sprintf("%s/%s/%s", callproto, saddr, sintf) - } - if !t.startCalling(callname) { - return - } - defer func() { - // Block new calls for a little while, to mitigate livelock scenarios - rand.Seed(time.Now().UnixNano()) - delay := default_timeout + time.Duration(rand.Intn(10000))*time.Millisecond - time.Sleep(delay) - t.mutex.Lock() - delete(t.calls, callname) - t.mutex.Unlock() - }() - var conn net.Conn - var err error - if options.socksProxyAddr != "" { - if sintf != "" { - return - } - dialerdst, er := net.ResolveTCPAddr("tcp", options.socksProxyAddr) - if er != nil { - return - } - var dialer proxy.Dialer - dialer, err = proxy.SOCKS5("tcp", dialerdst.String(), options.socksProxyAuth, proxy.Direct) - if err != nil { - return - } - ctx, done := context.WithTimeout(t.links.core.ctx, default_timeout) - conn, err = dialer.(proxy.ContextDialer).DialContext(ctx, "tcp", saddr) - done() - if err != nil { - return - } - t.waitgroup.Add(1) - options.socksPeerAddr = saddr - if ch := t.handler(conn, false, options); ch != nil { - <-ch - } - } else { - dst, err := net.ResolveTCPAddr("tcp", saddr) - if err != nil { - return - } - if dst.IP.IsLinkLocalUnicast() { - dst.Zone = sintf - if dst.Zone == "" { - return - } - } - dialer := net.Dialer{ - Control: t.tcpContext, - } - if sintf != "" { - dialer.Control = t.getControl(sintf) - ief, err := net.InterfaceByName(sintf) - if err != nil { - return - } - if ief.Flags&net.FlagUp == 0 { - return - } - addrs, err := ief.Addrs() - if err == nil { - for addrindex, addr := range addrs { - src, _, err := net.ParseCIDR(addr.String()) - if err != nil { - continue - } - if src.Equal(dst.IP) { - continue - } - if !src.IsGlobalUnicast() && !src.IsLinkLocalUnicast() { - continue - } - bothglobal := src.IsGlobalUnicast() == dst.IP.IsGlobalUnicast() - bothlinklocal := src.IsLinkLocalUnicast() == dst.IP.IsLinkLocalUnicast() - if !bothglobal && !bothlinklocal { - continue - } - if (src.To4() != nil) != (dst.IP.To4() != nil) { - continue - } - if bothglobal || bothlinklocal || addrindex == len(addrs)-1 { - dialer.LocalAddr = &net.TCPAddr{ - IP: src, - Port: 0, - Zone: sintf, - } - break - } - } - if dialer.LocalAddr == nil { - return - } - } - } - ctx, done := context.WithTimeout(t.links.core.ctx, default_timeout) - conn, err = dialer.DialContext(ctx, "tcp", dst.String()) - done() - if err != nil { - t.links.core.log.Debugf("Failed to dial %s: %s", callproto, err) - return - } - t.waitgroup.Add(1) - if ch := t.handler(conn, false, options); ch != nil { - <-ch - } - } - }() -} - -func (t *tcp) handler(sock net.Conn, incoming bool, options tcpOptions) chan struct{} { - defer t.waitgroup.Done() // Happens after sock.close - defer sock.Close() - t.setExtraOptions(sock) - var upgraded bool - if options.upgrade != nil { - var err error - if sock, err = options.upgrade.upgrade(sock, &options); err != nil { - t.links.core.log.Errorln("TCP handler upgrade failed:", err) - return nil - } - upgraded = true - } - var name, proto, local, remote string - if options.socksProxyAddr != "" { - name = "socks://" + sock.RemoteAddr().String() + "/" + options.socksPeerAddr - proto = "socks" - local, _, _ = net.SplitHostPort(sock.LocalAddr().String()) - remote, _, _ = net.SplitHostPort(options.socksPeerAddr) - } else { - if upgraded { - proto = options.upgrade.name - name = proto + "://" + sock.RemoteAddr().String() - } else { - proto = "tcp" - name = proto + "://" + sock.RemoteAddr().String() - } - local, _, _ = net.SplitHostPort(sock.LocalAddr().String()) - remote, _, _ = net.SplitHostPort(sock.RemoteAddr().String()) - } - localIP := net.ParseIP(local) - if localIP = localIP.To16(); localIP != nil { - var laddr address.Address - var lsubnet address.Subnet - copy(laddr[:], localIP) - copy(lsubnet[:], localIP) - if laddr.IsValid() || lsubnet.IsValid() { - // The local address is with the network address/prefix range - // This would route ygg over ygg, which we don't want - // FIXME ideally this check should happen outside of the core library - // Maybe dial/listen at the application level - // Then pass a net.Conn to the core library (after these kinds of checks are done) - t.links.core.log.Debugln("Dropping ygg-tunneled connection", local, remote) - return nil - } - } - force := net.ParseIP(strings.Split(remote, "%")[0]).IsLinkLocalUnicast() - link, err := t.links.create(sock, name, proto, local, remote, incoming, force, options.linkOptions) - if err != nil { - t.links.core.log.Println(err) - panic(err) - } - t.links.core.log.Debugln("DEBUG: starting handler for", name) - ch, err := link.handler() - t.links.core.log.Debugln("DEBUG: stopped handler for", name, err) - return ch -} diff --git a/src/core/tls.go b/src/core/tls.go deleted file mode 100644 index 9e340ac..0000000 --- a/src/core/tls.go +++ /dev/null @@ -1,126 +0,0 @@ -package core - -import ( - "bytes" - "crypto/ed25519" - "crypto/rand" - "crypto/tls" - "crypto/x509" - "crypto/x509/pkix" - "encoding/hex" - "encoding/pem" - "errors" - "log" - "math/big" - "net" - "time" -) - -type tcptls struct { - tcp *tcp - config *tls.Config - forDialer *TcpUpgrade - forListener *TcpUpgrade -} - -func (t *tcptls) init(tcp *tcp) { - t.tcp = tcp - t.forDialer = &TcpUpgrade{ - upgrade: t.upgradeDialer, - name: "tls", - } - t.forListener = &TcpUpgrade{ - upgrade: t.upgradeListener, - name: "tls", - } - - edpriv := make(ed25519.PrivateKey, ed25519.PrivateKeySize) - copy(edpriv[:], tcp.links.core.secret[:]) - - certBuf := &bytes.Buffer{} - - // TODO: because NotAfter is finite, we should add some mechanism to regenerate the certificate and restart the listeners periodically for nodes with very high uptimes. Perhaps regenerate certs and restart listeners every few months or so. - pubtemp := x509.Certificate{ - SerialNumber: big.NewInt(1), - Subject: pkix.Name{ - CommonName: hex.EncodeToString(tcp.links.core.public[:]), - }, - NotBefore: time.Now(), - NotAfter: time.Now().Add(time.Hour * 24 * 365), - KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature, - ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, - BasicConstraintsValid: true, - } - - derbytes, err := x509.CreateCertificate(rand.Reader, &pubtemp, &pubtemp, edpriv.Public(), edpriv) - if err != nil { - log.Fatalf("Failed to create certificate: %s", err) - } - - if err := pem.Encode(certBuf, &pem.Block{Type: "CERTIFICATE", Bytes: derbytes}); err != nil { - panic("failed to encode certificate into PEM") - } - - cpool := x509.NewCertPool() - cpool.AppendCertsFromPEM(derbytes) - - t.config = &tls.Config{ - RootCAs: cpool, - Certificates: []tls.Certificate{ - { - Certificate: [][]byte{derbytes}, - PrivateKey: edpriv, - }, - }, - InsecureSkipVerify: true, - MinVersion: tls.VersionTLS13, - } -} - -func (t *tcptls) configForOptions(options *tcpOptions) *tls.Config { - config := t.config.Clone() - config.VerifyPeerCertificate = func(rawCerts [][]byte, _ [][]*x509.Certificate) error { - if len(rawCerts) != 1 { - return errors.New("tls not exactly 1 cert") - } - cert, err := x509.ParseCertificate(rawCerts[0]) - if err != nil { - return errors.New("tls failed to parse cert") - } - if cert.PublicKeyAlgorithm != x509.Ed25519 { - return errors.New("tls wrong cert algorithm") - } - pk := cert.PublicKey.(ed25519.PublicKey) - var key keyArray - copy(key[:], pk) - // If options does not have a pinned key, then pin one now - if options.pinnedEd25519Keys == nil { - options.pinnedEd25519Keys = make(map[keyArray]struct{}) - options.pinnedEd25519Keys[key] = struct{}{} - } - if _, isIn := options.pinnedEd25519Keys[key]; !isIn { - return errors.New("tls key does not match pinned key") - } - return nil - } - return config -} - -func (t *tcptls) upgradeListener(c net.Conn, options *tcpOptions) (net.Conn, error) { - config := t.configForOptions(options) - conn := tls.Server(c, config) - if err := conn.Handshake(); err != nil { - return c, err - } - return conn, nil -} - -func (t *tcptls) upgradeDialer(c net.Conn, options *tcpOptions) (net.Conn, error) { - config := t.configForOptions(options) - config.ServerName = options.tlsSNI - conn := tls.Client(c, config) - if err := conn.Handshake(); err != nil { - return c, err - } - return conn, nil -} diff --git a/src/multicast/multicast.go b/src/multicast/multicast.go index 24e9d04..d40bcfc 100644 --- a/src/multicast/multicast.go +++ b/src/multicast/multicast.go @@ -45,7 +45,7 @@ type interfaceInfo struct { } type listenerInfo struct { - listener *core.TcpListener + listener *core.Listener time time.Time interval time.Duration port uint16 @@ -219,7 +219,7 @@ func (m *Multicast) _announce() { for name, info := range m._listeners { // Prepare our stop function! stop := func() { - info.listener.Stop() + info.listener.Close() delete(m._listeners, name) m.log.Debugln("No longer multicasting on", name) }