5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-10 05:10:26 +00:00
This commit is contained in:
Arceliar 2020-05-23 14:08:31 -05:00
commit 0188f14caa
3 changed files with 90 additions and 28 deletions

View File

@ -145,7 +145,8 @@ func (c *Conn) search() error {
} }
// Used in session keep-alive traffic // Used in session keep-alive traffic
func (c *Conn) doSearch() { func (c *Conn) _doSearch() {
s := fmt.Sprintf("conn=%p", c)
routerWork := func() { routerWork := func() {
// Check to see if there is a search already matching the destination // Check to see if there is a search already matching the destination
sinfo, isIn := c.core.router.searches.searches[*c.nodeID] sinfo, isIn := c.core.router.searches.searches[*c.nodeID]
@ -153,7 +154,7 @@ func (c *Conn) doSearch() {
// Nothing was found, so create a new search // Nothing was found, so create a new search
searchCompleted := func(sinfo *sessionInfo, e error) {} searchCompleted := func(sinfo *sessionInfo, e error) {}
sinfo = c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) sinfo = c.core.router.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted)
c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo) c.core.log.Debugf("%s DHT search started: %p", s, sinfo)
// Start the search // Start the search
sinfo.startSearch() sinfo.startSearch()
} }
@ -268,7 +269,7 @@ func (c *Conn) _write(msg FlowKeyMessage) error {
case time.Since(c.session.time) > 6*time.Second: case time.Since(c.session.time) > 6*time.Second:
if c.session.time.Before(c.session.pingTime) && time.Since(c.session.pingTime) > 6*time.Second { if c.session.time.Before(c.session.pingTime) && time.Since(c.session.pingTime) > 6*time.Second {
// TODO double check that the above condition is correct // TODO double check that the above condition is correct
c.doSearch() c._doSearch()
} else { } else {
c.session.ping(c.session) // TODO send from self if this becomes an actor c.session.ping(c.session) // TODO send from self if this becomes an actor
} }

View File

@ -16,6 +16,7 @@ import (
"github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util" "github.com/yggdrasil-network/yggdrasil-go/src/util"
"golang.org/x/net/proxy"
"github.com/Arceliar/phony" "github.com/Arceliar/phony"
) )
@ -50,6 +51,7 @@ type linkInterface struct {
name string name string
link *link link *link
peer *peer peer *peer
options linkOptions
msgIO linkInterfaceMsgIO msgIO linkInterfaceMsgIO
info linkInfo info linkInfo
incoming bool incoming bool
@ -67,6 +69,11 @@ type linkInterface struct {
unstalled bool // False if an idle notification to the switch hasn't been sent because we stalled (or are first starting up) unstalled bool // False if an idle notification to the switch hasn't been sent because we stalled (or are first starting up)
} }
type linkOptions struct {
pinnedCurve25519Keys map[crypto.BoxPubKey]struct{}
pinnedEd25519Keys map[crypto.SigPubKey]struct{}
}
func (l *link) init(c *Core) error { func (l *link) init(c *Core) error {
l.core = c l.core = c
l.mutex.Lock() l.mutex.Lock()
@ -92,13 +99,41 @@ func (l *link) call(uri string, sintf string) error {
return fmt.Errorf("peer %s is not correctly formatted (%s)", uri, err) return fmt.Errorf("peer %s is not correctly formatted (%s)", uri, err)
} }
pathtokens := strings.Split(strings.Trim(u.Path, "/"), "/") pathtokens := strings.Split(strings.Trim(u.Path, "/"), "/")
tcpOpts := tcpOptions{}
if pubkeys, ok := u.Query()["curve25519"]; ok && len(pubkeys) > 0 {
tcpOpts.pinnedCurve25519Keys = make(map[crypto.BoxPubKey]struct{})
for _, pubkey := range pubkeys {
if boxPub, err := hex.DecodeString(pubkey); err == nil {
var boxPubKey crypto.BoxPubKey
copy(boxPubKey[:], boxPub)
tcpOpts.pinnedCurve25519Keys[boxPubKey] = struct{}{}
}
}
}
if pubkeys, ok := u.Query()["ed25519"]; ok && len(pubkeys) > 0 {
tcpOpts.pinnedEd25519Keys = make(map[crypto.SigPubKey]struct{})
for _, pubkey := range pubkeys {
if sigPub, err := hex.DecodeString(pubkey); err == nil {
var sigPubKey crypto.SigPubKey
copy(sigPubKey[:], sigPub)
tcpOpts.pinnedEd25519Keys[sigPubKey] = struct{}{}
}
}
}
switch u.Scheme { switch u.Scheme {
case "tcp": case "tcp":
l.tcp.call(u.Host, nil, sintf, nil) l.tcp.call(u.Host, tcpOpts, sintf)
case "socks": case "socks":
l.tcp.call(pathtokens[0], u.Host, sintf, nil) tcpOpts.socksProxyAddr = u.Host
if u.User != nil {
tcpOpts.socksProxyAuth = &proxy.Auth{}
tcpOpts.socksProxyAuth.User = u.User.Username()
tcpOpts.socksProxyAuth.Password, _ = u.User.Password()
}
l.tcp.call(pathtokens[0], tcpOpts, sintf)
case "tls": case "tls":
l.tcp.call(u.Host, nil, sintf, l.tcp.tls.forDialer) tcpOpts.upgrade = l.tcp.tls.forDialer
l.tcp.call(u.Host, tcpOpts, sintf)
default: default:
return errors.New("unknown call scheme: " + u.Scheme) return errors.New("unknown call scheme: " + u.Scheme)
} }
@ -122,12 +157,13 @@ func (l *link) listen(uri string) error {
} }
} }
func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote string, incoming, force bool) (*linkInterface, error) { func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote string, incoming, force bool, options linkOptions) (*linkInterface, error) {
// Technically anything unique would work for names, but let's pick something human readable, just for debugging // Technically anything unique would work for names, but let's pick something human readable, just for debugging
intf := linkInterface{ intf := linkInterface{
name: name, name: name,
link: l, link: l,
msgIO: msgIO, options: options,
msgIO: msgIO,
info: linkInfo{ info: linkInfo{
linkType: linkType, linkType: linkType,
local: local, local: local,
@ -181,6 +217,20 @@ func (intf *linkInterface) handler() error {
intf.link.core.log.Errorln("Failed to connect to node: " + intf.name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) intf.link.core.log.Errorln("Failed to connect to node: " + intf.name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer))
return errors.New("failed to connect: wrong version") return errors.New("failed to connect: wrong version")
} }
// Check if the remote side matches the keys we expected. This is a bit of a weak
// check - in future versions we really should check a signature or something like that.
if pinned := intf.options.pinnedCurve25519Keys; pinned != nil {
if _, allowed := pinned[meta.box]; !allowed {
intf.link.core.log.Errorf("Failed to connect to node: %q sent curve25519 key that does not match pinned keys", intf.name)
return fmt.Errorf("failed to connect: host sent curve25519 key that does not match pinned keys")
}
}
if pinned := intf.options.pinnedEd25519Keys; pinned != nil {
if _, allowed := pinned[meta.sig]; !allowed {
intf.link.core.log.Errorf("Failed to connect to node: %q sent ed25519 key that does not match pinned keys", intf.name)
return fmt.Errorf("failed to connect: host sent ed25519 key that does not match pinned keys")
}
}
// Check if we're authorized to connect to this key / IP // Check if we're authorized to connect to this key / IP
if intf.incoming && !intf.force && !intf.link.core.peers.isAllowedEncryptionPublicKey(&meta.box) { if intf.incoming && !intf.force && !intf.link.core.peers.isAllowedEncryptionPublicKey(&meta.box) {
intf.link.core.log.Warnf("%s connection from %s forbidden: AllowedEncryptionPublicKeys does not contain key %s", intf.link.core.log.Warnf("%s connection from %s forbidden: AllowedEncryptionPublicKeys does not contain key %s",

View File

@ -57,6 +57,14 @@ type TcpUpgrade struct {
name string name string
} }
type tcpOptions struct {
linkOptions
upgrade *TcpUpgrade
socksProxyAddr string
socksProxyAuth *proxy.Auth
socksPeerAddr string
}
func (l *TcpListener) Stop() { func (l *TcpListener) Stop() {
defer func() { recover() }() defer func() { recover() }()
close(l.stop) close(l.stop)
@ -221,7 +229,10 @@ func (t *tcp) listener(l *TcpListener, listenaddr string) {
return return
} }
t.waitgroup.Add(1) t.waitgroup.Add(1)
go t.handler(sock, true, nil, l.upgrade) options := tcpOptions{
upgrade: l.upgrade,
}
go t.handler(sock, true, options)
} }
} }
@ -239,12 +250,12 @@ func (t *tcp) startCalling(saddr string) bool {
// If the dial is successful, it launches the handler. // If the dial is successful, it launches the handler.
// When finished, it removes the outgoing call, so reconnection attempts can be made later. // When finished, it removes the outgoing call, so reconnection attempts can be made later.
// This all happens in a separate goroutine that it spawns. // This all happens in a separate goroutine that it spawns.
func (t *tcp) call(saddr string, options interface{}, sintf string, upgrade *TcpUpgrade) { func (t *tcp) call(saddr string, options tcpOptions, sintf string) {
go func() { go func() {
callname := saddr callname := saddr
callproto := "TCP" callproto := "TCP"
if upgrade != nil { if options.upgrade != nil {
callproto = strings.ToUpper(upgrade.name) callproto = strings.ToUpper(options.upgrade.name)
} }
if sintf != "" { if sintf != "" {
callname = fmt.Sprintf("%s/%s/%s", callproto, saddr, sintf) callname = fmt.Sprintf("%s/%s/%s", callproto, saddr, sintf)
@ -263,17 +274,16 @@ func (t *tcp) call(saddr string, options interface{}, sintf string, upgrade *Tcp
}() }()
var conn net.Conn var conn net.Conn
var err error var err error
socksaddr, issocks := options.(string) if options.socksProxyAddr != "" {
if issocks {
if sintf != "" { if sintf != "" {
return return
} }
dialerdst, er := net.ResolveTCPAddr("tcp", socksaddr) dialerdst, er := net.ResolveTCPAddr("tcp", options.socksProxyAddr)
if er != nil { if er != nil {
return return
} }
var dialer proxy.Dialer var dialer proxy.Dialer
dialer, err = proxy.SOCKS5("tcp", dialerdst.String(), nil, proxy.Direct) dialer, err = proxy.SOCKS5("tcp", dialerdst.String(), options.socksProxyAuth, proxy.Direct)
if err != nil { if err != nil {
return return
} }
@ -282,7 +292,8 @@ func (t *tcp) call(saddr string, options interface{}, sintf string, upgrade *Tcp
return return
} }
t.waitgroup.Add(1) t.waitgroup.Add(1)
t.handler(conn, false, saddr, nil) options.socksPeerAddr = conn.RemoteAddr().String()
t.handler(conn, false, options)
} else { } else {
dst, err := net.ResolveTCPAddr("tcp", saddr) dst, err := net.ResolveTCPAddr("tcp", saddr)
if err != nil { if err != nil {
@ -348,19 +359,19 @@ func (t *tcp) call(saddr string, options interface{}, sintf string, upgrade *Tcp
return return
} }
t.waitgroup.Add(1) t.waitgroup.Add(1)
t.handler(conn, false, nil, upgrade) t.handler(conn, false, options)
} }
}() }()
} }
func (t *tcp) handler(sock net.Conn, incoming bool, options interface{}, upgrade *TcpUpgrade) { func (t *tcp) handler(sock net.Conn, incoming bool, options tcpOptions) {
defer t.waitgroup.Done() // Happens after sock.close defer t.waitgroup.Done() // Happens after sock.close
defer sock.Close() defer sock.Close()
t.setExtraOptions(sock) t.setExtraOptions(sock)
var upgraded bool var upgraded bool
if upgrade != nil { if options.upgrade != nil {
var err error var err error
if sock, err = upgrade.upgrade(sock); err != nil { if sock, err = options.upgrade.upgrade(sock); err != nil {
t.link.core.log.Errorln("TCP handler upgrade failed:", err) t.link.core.log.Errorln("TCP handler upgrade failed:", err)
return return
} else { } else {
@ -370,14 +381,14 @@ func (t *tcp) handler(sock net.Conn, incoming bool, options interface{}, upgrade
stream := stream{} stream := stream{}
stream.init(sock) stream.init(sock)
var name, proto, local, remote string var name, proto, local, remote string
if socksaddr, issocks := options.(string); issocks { if options.socksProxyAddr != "" {
name = "socks://" + sock.RemoteAddr().String() + "/" + socksaddr name = "socks://" + sock.RemoteAddr().String() + "/" + options.socksPeerAddr
proto = "socks" proto = "socks"
local, _, _ = net.SplitHostPort(sock.LocalAddr().String()) local, _, _ = net.SplitHostPort(sock.LocalAddr().String())
remote, _, _ = net.SplitHostPort(socksaddr) remote, _, _ = net.SplitHostPort(options.socksPeerAddr)
} else { } else {
if upgraded { if upgraded {
proto = upgrade.name proto = options.upgrade.name
name = proto + "://" + sock.RemoteAddr().String() name = proto + "://" + sock.RemoteAddr().String()
} else { } else {
proto = "tcp" proto = "tcp"
@ -387,7 +398,7 @@ func (t *tcp) handler(sock net.Conn, incoming bool, options interface{}, upgrade
remote, _, _ = net.SplitHostPort(sock.RemoteAddr().String()) remote, _, _ = net.SplitHostPort(sock.RemoteAddr().String())
} }
force := net.ParseIP(strings.Split(remote, "%")[0]).IsLinkLocalUnicast() force := net.ParseIP(strings.Split(remote, "%")[0]).IsLinkLocalUnicast()
link, err := t.link.core.link.create(&stream, name, proto, local, remote, incoming, force) link, err := t.link.core.link.create(&stream, name, proto, local, remote, incoming, force, options.linkOptions)
if err != nil { if err != nil {
t.link.core.log.Println(err) t.link.core.log.Println(err)
panic(err) panic(err)