diff --git a/contrib/ansible/genkeys.go b/contrib/ansible/genkeys.go index 7df2e58..5213e8b 100644 --- a/contrib/ansible/genkeys.go +++ b/contrib/ansible/genkeys.go @@ -35,22 +35,22 @@ func main() { } var encryptionKeys []keySet - for i := 0; i < *numHosts + 1; i++ { + for i := 0; i < *numHosts+1; i++ { encryptionKeys = append(encryptionKeys, newBoxKey()) } encryptionKeys = sortKeySetArray(encryptionKeys) - for i := 0; i < *keyTries - *numHosts - 1; i++ { - encryptionKeys[0] = newBoxKey(); + for i := 0; i < *keyTries-*numHosts-1; i++ { + encryptionKeys[0] = newBoxKey() encryptionKeys = bubbleUpTo(encryptionKeys, 0) } var signatureKeys []keySet - for i := 0; i < *numHosts + 1; i++ { + for i := 0; i < *numHosts+1; i++ { signatureKeys = append(signatureKeys, newSigKey()) } signatureKeys = sortKeySetArray(signatureKeys) - for i := 0; i < *keyTries - *numHosts - 1; i++ { - signatureKeys[0] = newSigKey(); + for i := 0; i < *keyTries-*numHosts-1; i++ { + signatureKeys[0] = newSigKey() signatureKeys = bubbleUpTo(signatureKeys, 0) } @@ -112,11 +112,11 @@ func sortKeySetArray(sets []keySet) []keySet { } func bubbleUpTo(sets []keySet, num int) []keySet { - for i := 0; i < len(sets) - num - 1; i++ { - if isBetter(sets[i + 1].id, sets[i].id) { + for i := 0; i < len(sets)-num-1; i++ { + if isBetter(sets[i+1].id, sets[i].id) { var tmp = sets[i] - sets[i] = sets[i + 1] - sets[i + 1] = tmp + sets[i] = sets[i+1] + sets[i+1] = tmp } else { break } diff --git a/go.mod b/go.mod index 3e8db51..995e54c 100644 --- a/go.mod +++ b/go.mod @@ -2,10 +2,10 @@ module github.com/yggdrasil-network/yggdrasil-go require ( github.com/docker/libcontainer v2.2.1+incompatible + github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 github.com/hjson/hjson-go v0.0.0-20181010104306-a25ecf6bd222 github.com/kardianos/minwinsvc v0.0.0-20151122163309-cad6b2b879b0 github.com/mitchellh/mapstructure v1.1.2 - github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 github.com/songgao/packets v0.0.0-20160404182456-549a10cd4091 github.com/yggdrasil-network/water v0.0.0-20180615095340-f732c88f34ae golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 diff --git a/go.sum b/go.sum index 17b1017..92dfe88 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/docker/libcontainer v2.2.1+incompatible h1:++SbbkCw+X8vAd4j2gOCzZ2Nn7s2xFALTf7LZKmM1/0= github.com/docker/libcontainer v2.2.1+incompatible/go.mod h1:osvj61pYsqhNCMLGX31xr7klUBhHb/ZBuXS0o1Fvwbw= +github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 h1:WD8iJ37bRNwvETMfVTusVSAi0WdXTpfNVGY2aHycNKY= +github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8/go.mod h1:gq31gQ8wEHkR+WekdWsqDuf8pXTUZA9BnnzTuPz1Y9U= github.com/hjson/hjson-go v0.0.0-20181010104306-a25ecf6bd222 h1:xmvkbxXDeN1ffWq8kvrhyqVYAO2aXuRBsbpxVTR+JyU= github.com/hjson/hjson-go v0.0.0-20181010104306-a25ecf6bd222/go.mod h1:qsetwF8NlsTsOTwZTApNlTCerV+b2GjYRRcIk4JMFio= github.com/kardianos/minwinsvc v0.0.0-20151122163309-cad6b2b879b0 h1:YnZmFjg0Nvk8851WTVWlqMC1ecJH07Ctz+Ezxx4u54g= @@ -18,5 +20,3 @@ golang.org/x/sys v0.0.0-20181206074257-70b957f3b65e h1:njOxP/wVblhCLIUhjHXf6X+dz golang.org/x/sys v0.0.0-20181206074257-70b957f3b65e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 h1:WD8iJ37bRNwvETMfVTusVSAi0WdXTpfNVGY2aHycNKY= -github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8/go.mod h1:gq31gQ8wEHkR+WekdWsqDuf8pXTUZA9BnnzTuPz1Y9U= diff --git a/src/util/util.go b/src/util/util.go index 65e6d46..45be3b1 100644 --- a/src/util/util.go +++ b/src/util/util.go @@ -3,6 +3,7 @@ package util // These are misc. utility functions that didn't really fit anywhere else import "runtime" +import "time" // A wrapper around runtime.Gosched() so it doesn't need to be imported elsewhere. func Yield() { @@ -44,3 +45,14 @@ func PutBytes(bs []byte) { default: } } + +// This is a workaround to go's broken timer implementation +func TimerStop(t *time.Timer) bool { + if !t.Stop() { + select { + case <-t.C: + default: + } + } + return true +} diff --git a/src/yggdrasil/awdl.go b/src/yggdrasil/awdl.go index 190d025..4236688 100644 --- a/src/yggdrasil/awdl.go +++ b/src/yggdrasil/awdl.go @@ -2,147 +2,97 @@ package yggdrasil import ( "errors" - "fmt" + "io" "sync" - "sync/atomic" - "time" - - "github.com/yggdrasil-network/yggdrasil-go/src/crypto" - "github.com/yggdrasil-network/yggdrasil-go/src/util" ) type awdl struct { - core *Core + link *link mutex sync.RWMutex // protects interfaces below interfaces map[string]*awdlInterface } type awdlInterface struct { - awdl *awdl - fromAWDL chan []byte - toAWDL chan []byte - shutdown chan bool - peer *peer + linkif *linkInterface + rwc awdlReadWriteCloser + peer *peer + stream stream } -func (l *awdl) init(c *Core) error { - l.core = c - l.mutex.Lock() - l.interfaces = make(map[string]*awdlInterface) - l.mutex.Unlock() +type awdlReadWriteCloser struct { + fromAWDL chan []byte + toAWDL chan []byte +} + +func (c awdlReadWriteCloser) Read(p []byte) (n int, err error) { + if packet, ok := <-c.fromAWDL; ok { + n = copy(p, packet) + return n, nil + } + return 0, io.EOF +} + +func (c awdlReadWriteCloser) Write(p []byte) (n int, err error) { + var pc []byte + pc = append(pc, p...) + c.toAWDL <- pc + return len(pc), nil +} + +func (c awdlReadWriteCloser) Close() error { + close(c.fromAWDL) + close(c.toAWDL) + return nil +} + +func (a *awdl) init(l *link) error { + a.link = l + a.mutex.Lock() + a.interfaces = make(map[string]*awdlInterface) + a.mutex.Unlock() return nil } -func (l *awdl) create(fromAWDL chan []byte, toAWDL chan []byte /*boxPubKey *crypto.BoxPubKey, sigPubKey *crypto.SigPubKey*/, name string) (*awdlInterface, error) { +func (a *awdl) create(name, local, remote string, incoming bool) (*awdlInterface, error) { + rwc := awdlReadWriteCloser{ + fromAWDL: make(chan []byte, 1), + toAWDL: make(chan []byte, 1), + } + s := stream{} + s.init(rwc) + linkif, err := a.link.create(&s, name, "awdl", local, remote, incoming, true) + if err != nil { + return nil, err + } intf := awdlInterface{ - awdl: l, - fromAWDL: fromAWDL, - toAWDL: toAWDL, - shutdown: make(chan bool), + linkif: linkif, + rwc: rwc, } - l.mutex.Lock() - l.interfaces[name] = &intf - l.mutex.Unlock() - myLinkPub, myLinkPriv := crypto.NewBoxKeys() - meta := version_getBaseMetadata() - meta.box = l.core.boxPub - meta.sig = l.core.sigPub - meta.link = *myLinkPub - metaBytes := meta.encode() - l.core.log.Traceln("toAWDL <- metaBytes") - toAWDL <- metaBytes - l.core.log.Traceln("metaBytes = <-fromAWDL") - metaBytes = <-fromAWDL - l.core.log.Traceln("version_metadata{}") - meta = version_metadata{} - if !meta.decode(metaBytes) || !meta.check() { - return nil, errors.New("Metadata decode failure") - } - l.core.log.Traceln("version_getBaseMetadata{}") - base := version_getBaseMetadata() - if meta.ver > base.ver || meta.ver == base.ver && meta.minorVer > base.minorVer { - return nil, errors.New("Failed to connect to node: " + name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) - } - l.core.log.Traceln("crypto.GetSharedKey") - shared := crypto.GetSharedKey(myLinkPriv, &meta.link) - //shared := crypto.GetSharedKey(&l.core.boxPriv, boxPubKey) - l.core.log.Traceln("l.core.peers.newPeer") - intf.peer = l.core.peers.newPeer(&meta.box, &meta.sig, shared, name) - if intf.peer != nil { - intf.peer.linkOut = make(chan []byte, 1) // protocol traffic - intf.peer.out = func(msg []byte) { - defer func() { recover() }() - intf.toAWDL <- msg - } // called by peer.sendPacket() - l.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle - intf.peer.close = func() { - close(intf.fromAWDL) - close(intf.toAWDL) - } - go intf.handler() - go intf.peer.linkLoop() - return &intf, nil - } - delete(l.interfaces, name) - return nil, errors.New("l.core.peers.newPeer failed") + a.mutex.Lock() + a.interfaces[name] = &intf + a.mutex.Unlock() + go intf.linkif.handler() + return &intf, nil } -func (l *awdl) getInterface(identity string) *awdlInterface { - l.mutex.RLock() - defer l.mutex.RUnlock() - if intf, ok := l.interfaces[identity]; ok { +func (a *awdl) getInterface(identity string) *awdlInterface { + a.mutex.RLock() + defer a.mutex.RUnlock() + if intf, ok := a.interfaces[identity]; ok { return intf } return nil } -func (l *awdl) shutdown(identity string) error { - if intf, ok := l.interfaces[identity]; ok { - intf.shutdown <- true - l.core.peers.removePeer(intf.peer.port) - l.mutex.Lock() - delete(l.interfaces, identity) - l.mutex.Unlock() +func (a *awdl) shutdown(identity string) error { + if intf, ok := a.interfaces[identity]; ok { + close(intf.linkif.closed) + intf.rwc.Close() + a.mutex.Lock() + delete(a.interfaces, identity) + a.mutex.Unlock() return nil - } else { - return errors.New(fmt.Sprintf("Interface '%s' doesn't exist or already shutdown", identity)) - } -} - -func (ai *awdlInterface) handler() { - send := func(msg []byte) { - ai.toAWDL <- msg - atomic.AddUint64(&ai.peer.bytesSent, uint64(len(msg))) - util.PutBytes(msg) - } - for { - timerInterval := tcp_ping_interval - timer := time.NewTimer(timerInterval) - defer timer.Stop() - select { - case p := <-ai.peer.linkOut: - send(p) - continue - default: - } - timer.Stop() - select { - case <-timer.C: - default: - } - timer.Reset(timerInterval) - select { - case _ = <-timer.C: - send([]byte{}) - case p := <-ai.peer.linkOut: - send(p) - continue - case r := <-ai.fromAWDL: - ai.peer.handlePacket(r) - ai.awdl.core.switchTable.idleIn <- ai.peer.port - case <-ai.shutdown: - return - } } + return errors.New("Interface not found or already closed") } diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index c78bc1f..2e23dd1 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -45,7 +45,7 @@ type Core struct { searches searches multicast multicast tcp tcpInterface - awdl awdl + link link log *log.Logger } @@ -199,8 +199,8 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { return err } - if err := c.awdl.init(c); err != nil { - c.log.Errorln("Failed to start AWDL interface") + if err := c.link.init(c); err != nil { + c.log.Errorln("Failed to start link interfaces") return err } diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index f7319e4..94faba4 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -97,9 +97,7 @@ func (c *Core) DEBUG_getPeers() *peers { } func (ps *peers) DEBUG_newPeer(box crypto.BoxPubKey, sig crypto.SigPubKey, link crypto.BoxSharedKey) *peer { - //in <-chan []byte, - //out chan<- []byte) *peer { - return ps.newPeer(&box, &sig, &link, "(simulator)") //, in, out) + return ps.newPeer(&box, &sig, &link, "(simulator)", nil) } /* diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go new file mode 100644 index 0000000..be98dd9 --- /dev/null +++ b/src/yggdrasil/link.go @@ -0,0 +1,311 @@ +package yggdrasil + +import ( + "encoding/hex" + "errors" + "fmt" + "net" + "strings" + "sync" + //"sync/atomic" + "time" + + "github.com/yggdrasil-network/yggdrasil-go/src/address" + "github.com/yggdrasil-network/yggdrasil-go/src/crypto" + "github.com/yggdrasil-network/yggdrasil-go/src/util" +) + +type link struct { + core *Core + mutex sync.RWMutex // protects interfaces below + interfaces map[linkInfo]*linkInterface + awdl awdl // AWDL interface support + // TODO timeout (to remove from switch), read from config.ReadTimeout +} + +type linkInfo struct { + box crypto.BoxPubKey // Their encryption key + sig crypto.SigPubKey // Their signing key + linkType string // Type of link, e.g. TCP, AWDL + local string // Local name or address + remote string // Remote name or address +} + +type linkInterfaceMsgIO interface { + readMsg() ([]byte, error) + writeMsg([]byte) (int, error) + close() error + // These are temporary workarounds to stream semantics + _sendMetaBytes([]byte) error + _recvMetaBytes() ([]byte, error) +} + +type linkInterface struct { + name string + link *link + peer *peer + msgIO linkInterfaceMsgIO + info linkInfo + incoming bool + force bool + closed chan struct{} +} + +func (l *link) init(c *Core) error { + l.core = c + l.mutex.Lock() + l.interfaces = make(map[linkInfo]*linkInterface) + l.mutex.Unlock() + + if err := l.awdl.init(l); err != nil { + l.core.log.Errorln("Failed to start AWDL interface") + return err + } + + return nil +} + +func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote string, incoming, force bool) (*linkInterface, error) { + // Technically anything unique would work for names, but lets pick something human readable, just for debugging + intf := linkInterface{ + name: name, + link: l, + msgIO: msgIO, + info: linkInfo{ + linkType: linkType, + local: local, + remote: remote, + }, + incoming: incoming, + force: force, + } + return &intf, nil +} + +func (intf *linkInterface) handler() error { + // TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later + myLinkPub, myLinkPriv := crypto.NewBoxKeys() + meta := version_getBaseMetadata() + meta.box = intf.link.core.boxPub + meta.sig = intf.link.core.sigPub + meta.link = *myLinkPub + metaBytes := meta.encode() + // TODO timeouts on send/recv (goroutine for send/recv, channel select w/ timer) + err := intf.msgIO._sendMetaBytes(metaBytes) + if err != nil { + return err + } + metaBytes, err = intf.msgIO._recvMetaBytes() + if err != nil { + return err + } + meta = version_metadata{} + if !meta.decode(metaBytes) || !meta.check() { + return errors.New("failed to decode metadata") + } + base := version_getBaseMetadata() + if meta.ver > base.ver || meta.ver == base.ver && meta.minorVer > base.minorVer { + intf.link.core.log.Errorln("Failed to connect to node: " + intf.name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) + return errors.New("failed to connect: wrong version") + } + // Check if we're authorized to connect to this key / IP + if !intf.force && !intf.link.core.peers.isAllowedEncryptionPublicKey(&meta.box) { + intf.link.core.log.Debugf("%s connection to %s forbidden: AllowedEncryptionPublicKeys does not contain key %s", + strings.ToUpper(intf.info.linkType), intf.info.remote, hex.EncodeToString(meta.box[:])) + intf.msgIO.close() + return nil + } + // Check if we already have a link to this node + intf.info.box = meta.box + intf.info.sig = meta.sig + intf.link.mutex.Lock() + if oldIntf, isIn := intf.link.interfaces[intf.info]; isIn { + intf.link.mutex.Unlock() + // FIXME we should really return an error and let the caller block instead + // That lets them do things like close connections on its own, avoid printing a connection message in the first place, etc. + intf.link.core.log.Debugln("DEBUG: found existing interface for", intf.name) + intf.msgIO.close() + <-oldIntf.closed + return nil + } else { + intf.closed = make(chan struct{}) + intf.link.interfaces[intf.info] = intf + defer func() { + intf.link.mutex.Lock() + delete(intf.link.interfaces, intf.info) + intf.link.mutex.Unlock() + close(intf.closed) + }() + intf.link.core.log.Debugln("DEBUG: registered interface for", intf.name) + } + intf.link.mutex.Unlock() + // Create peer + shared := crypto.GetSharedKey(myLinkPriv, &meta.link) + intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name, func() { intf.msgIO.close() }) + if intf.peer == nil { + return errors.New("failed to create peer") + } + defer func() { + // More cleanup can go here + intf.link.core.peers.removePeer(intf.peer.port) + }() + // Finish setting up the peer struct + out := make(chan []byte, 1) + defer close(out) + intf.peer.out = func(msg []byte) { + defer func() { recover() }() + out <- msg + } + intf.peer.linkOut = make(chan []byte, 1) + themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) + themAddrString := net.IP(themAddr[:]).String() + themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote) + intf.link.core.log.Infof("Connected %s: %s, source %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local) + defer intf.link.core.log.Infof("Disconnected %s: %s, source %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local) + // Start the link loop + go intf.peer.linkLoop() + // Start the writer + signalReady := make(chan struct{}, 1) + signalSent := make(chan bool, 1) + sendAck := make(chan struct{}, 1) + go func() { + defer close(signalReady) + defer close(signalSent) + interval := 4 * time.Second + tcpTimer := time.NewTimer(interval) // used for backwards compat with old tcp + defer util.TimerStop(tcpTimer) + send := func(bs []byte) { + intf.msgIO.writeMsg(bs) + select { + case signalSent <- len(bs) > 0: + default: + } + } + for { + // First try to send any link protocol traffic + select { + case msg := <-intf.peer.linkOut: + send(msg) + continue + default: + } + // No protocol traffic to send, so reset the timer + util.TimerStop(tcpTimer) + tcpTimer.Reset(interval) + // Now block until something is ready or the timer triggers keepalive traffic + select { + case <-tcpTimer.C: + intf.link.core.log.Debugf("Sending (legacy) keep-alive to %s: %s, source %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local) + send(nil) + case <-sendAck: + intf.link.core.log.Debugf("Sending ack to %s: %s, source %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local) + send(nil) + case msg := <-intf.peer.linkOut: + intf.msgIO.writeMsg(msg) + case msg, ok := <-out: + if !ok { + return + } + send(msg) + util.PutBytes(msg) + select { + case signalReady <- struct{}{}: + default: + } + } + } + }() + //intf.link.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle + // Used to enable/disable activity in the switch + signalAlive := make(chan bool, 1) // True = real packet, false = keep-alive + defer close(signalAlive) + go func() { + var isAlive bool + var isReady bool + var sendTimerRunning bool + var recvTimerRunning bool + recvTime := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed + sendTime := time.Second + sendTimer := time.NewTimer(sendTime) + defer util.TimerStop(sendTimer) + recvTimer := time.NewTimer(recvTime) + defer util.TimerStop(recvTimer) + for { + select { + case gotMsg, ok := <-signalAlive: + if !ok { + return + } + if !isAlive { + isAlive = true + if !isReady { + // (Re-)enable in the switch + isReady = true + intf.link.core.switchTable.idleIn <- intf.peer.port + } + } + if gotMsg && !sendTimerRunning { + // We got a message + // Start a timer, if it expires then send a 0-sized ack to let them know we're alive + util.TimerStop(sendTimer) + sendTimer.Reset(sendTime) + sendTimerRunning = true + } + case sentMsg, ok := <-signalSent: + // Stop any running ack timer + if !ok { + return + } + util.TimerStop(sendTimer) + sendTimerRunning = false + if sentMsg && !recvTimerRunning { + // We sent a message + // Start a timer, if it expires and we haven't gotten any return traffic (including a 0-sized ack), then assume there's a problem + util.TimerStop(recvTimer) + recvTimer.Reset(recvTime) + recvTimerRunning = true + } + case _, ok := <-signalReady: + if !ok { + return + } + if !isAlive || !isReady { + // Disable in the switch + isReady = false + } else { + // Keep enabled in the switch + intf.link.core.switchTable.idleIn <- intf.peer.port + } + case <-sendTimer.C: + // We haven't sent anything, so signal a send of a 0 packet to let them know we're alive + select { + case sendAck <- struct{}{}: + default: + } + case <-recvTimer.C: + // We haven't received anything, so assume there's a problem and don't return this node to the switch until they start responding + isAlive = false + } + } + }() + // Run reader loop + for { + msg, err := intf.msgIO.readMsg() + if len(msg) > 0 { + intf.peer.handlePacket(msg) + } + if err != nil { + return err + } + select { + case signalAlive <- len(msg) > 0: + default: + } + } + //////////////////////////////////////////////////////////////////////////////// + return nil +} diff --git a/src/yggdrasil/mobile.go b/src/yggdrasil/mobile.go index 220e6ca..76fbe54 100644 --- a/src/yggdrasil/mobile.go +++ b/src/yggdrasil/mobile.go @@ -5,11 +5,11 @@ package yggdrasil import ( "encoding/hex" "encoding/json" - "log" "os" - "regexp" "time" + "github.com/gologme/log" + hjson "github.com/hjson/hjson-go" "github.com/mitchellh/mapstructure" "github.com/yggdrasil-network/yggdrasil-go/src/config" @@ -52,10 +52,6 @@ func (c *Core) StartAutoconfigure() error { if hostname, err := os.Hostname(); err == nil { nc.NodeInfo = map[string]interface{}{"name": hostname} } - ifceExpr, err := regexp.Compile(".*") - if err == nil { - c.ifceExpr = append(c.ifceExpr, ifceExpr) - } if err := c.Start(nc, logger); err != nil { return err } @@ -77,13 +73,6 @@ func (c *Core) StartJSON(configjson []byte) error { return err } nc.IfName = "dummy" - for _, ll := range nc.MulticastInterfaces { - ifceExpr, err := regexp.Compile(ll) - if err != nil { - panic(err) - } - c.AddMulticastInterfaceExpr(ifceExpr) - } if err := c.Start(nc, logger); err != nil { return err } diff --git a/src/yggdrasil/mobile_ios.go b/src/yggdrasil/mobile_ios.go index 72920fe..7b08999 100644 --- a/src/yggdrasil/mobile_ios.go +++ b/src/yggdrasil/mobile_ios.go @@ -29,39 +29,33 @@ func (nsl MobileLogger) Write(p []byte) (n int, err error) { return len(p), nil } -func (c *Core) AWDLCreateInterface(name string) error { - fromAWDL := make(chan []byte, 32) - toAWDL := make(chan []byte, 32) - - if intf, err := c.awdl.create(fromAWDL, toAWDL, name); err == nil { - if intf != nil { - c.log.Println(err) - return err - } else { - c.log.Println("c.awdl.create didn't return an interface") - return errors.New("c.awdl.create didn't return an interface") - } - } else { - c.log.Println(err) +func (c *Core) AWDLCreateInterface(name, local, remote string, incoming bool) error { + if intf, err := c.link.awdl.create(name, local, remote, incoming); err != nil || intf == nil { + c.log.Println("c.link.awdl.create:", err) return err } + return nil } func (c *Core) AWDLShutdownInterface(name string) error { - return c.awdl.shutdown(name) + return c.link.awdl.shutdown(name) } func (c *Core) AWDLRecvPacket(identity string) ([]byte, error) { - if intf := c.awdl.getInterface(identity); intf != nil { - return <-intf.toAWDL, nil + if intf := c.link.awdl.getInterface(identity); intf != nil { + read, ok := <-intf.rwc.toAWDL + if !ok { + return nil, errors.New("AWDLRecvPacket: channel closed") + } + return read, nil } return nil, errors.New("AWDLRecvPacket identity not known: " + identity) } func (c *Core) AWDLSendPacket(identity string, buf []byte) error { packet := append(util.GetBytes(), buf[:]...) - if intf := c.awdl.getInterface(identity); intf != nil { - intf.fromAWDL <- packet + if intf := c.link.awdl.getInterface(identity); intf != nil { + intf.rwc.fromAWDL <- packet return nil } return errors.New("AWDLSendPacket identity not known: " + identity) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 2cd1afe..237d6f6 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -113,7 +113,7 @@ type peer struct { } // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number. -func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, endpoint string) *peer { +func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, endpoint string, closer func()) *peer { now := time.Now() p := peer{box: *box, sig: *sig, @@ -123,6 +123,7 @@ func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShare firstSeen: now, doSend: make(chan struct{}, 1), dinfo: make(chan *dhtInfo, 1), + close: closer, core: ps.core} ps.mutex.Lock() defer ps.mutex.Unlock() @@ -250,6 +251,7 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) { func (p *peer) sendPacket(packet []byte) { // Is there ever a case where something more complicated is needed? // What if p.out blocks? + atomic.AddUint64(&p.bytesSent, uint64(len(packet))) p.out(packet) } diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index d505936..99e6982 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -67,7 +67,7 @@ func (r *router) init(core *Core) { r.addr = *address.AddrForNodeID(&r.core.dht.nodeID) r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID) in := make(chan []byte, 32) // TODO something better than this... - p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, "(self)") + p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, "(self)", nil) p.out = func(packet []byte) { // This is to make very sure it never blocks select { diff --git a/src/yggdrasil/stream.go b/src/yggdrasil/stream.go new file mode 100644 index 0000000..db2cdf7 --- /dev/null +++ b/src/yggdrasil/stream.go @@ -0,0 +1,141 @@ +package yggdrasil + +import ( + "errors" + "fmt" + "io" + + "github.com/yggdrasil-network/yggdrasil-go/src/util" +) + +// Test that this matches the interface we expect +var _ = linkInterfaceMsgIO(&stream{}) + +type stream struct { + rwc io.ReadWriteCloser + inputBuffer []byte // Incoming packet stream +} + +func (s *stream) close() error { + return s.rwc.Close() +} + +const streamMsgSize = 2048 + 65535 + +var streamMsg = [...]byte{0xde, 0xad, 0xb1, 0x75} // "dead bits" + +func (s *stream) init(rwc io.ReadWriteCloser) { + // TODO have this also do the metadata handshake and create the peer struct + s.rwc = rwc + // TODO call something to do the metadata exchange +} + +// writeMsg writes a message with stream padding, and is *not* thread safe. +func (s *stream) writeMsg(bs []byte) (int, error) { + buf := util.GetBytes() + defer util.PutBytes(buf) + buf = append(buf, streamMsg[:]...) + buf = append(buf, wire_encode_uint64(uint64(len(bs)))...) + padLen := len(buf) + buf = append(buf, bs...) + var bn int + for bn < len(buf) { + n, err := s.rwc.Write(buf[bn:]) + bn += n + if err != nil { + l := bn - padLen + if l < 0 { + l = 0 + } + return l, err + } + } + return len(bs), nil +} + +// readMsg reads a message from the stream, accounting for stream padding, and is *not* thread safe. +func (s *stream) readMsg() ([]byte, error) { + for { + buf := s.inputBuffer + msg, ok, err := stream_chopMsg(&buf) + switch { + case err != nil: + // Something in the stream format is corrupt + return nil, fmt.Errorf("message error: %v", err) + case ok: + // Copy the packet into bs, shift the buffer, and return + msg = append(util.GetBytes(), msg...) + s.inputBuffer = append(s.inputBuffer[:0], buf...) + return msg, nil + default: + // Wait for the underlying reader to return enough info for us to proceed + frag := make([]byte, 2*streamMsgSize) + n, err := s.rwc.Read(frag) + if n > 0 { + s.inputBuffer = append(s.inputBuffer, frag[:n]...) + } else if err != nil { + return nil, err + } + } + } +} + +// Writes metadata bytes without stream padding, meant to be temporary +func (s *stream) _sendMetaBytes(metaBytes []byte) error { + var written int + for written < len(metaBytes) { + n, err := s.rwc.Write(metaBytes) + written += n + if err != nil { + return err + } + } + return nil +} + +// Reads metadata bytes without stream padding, meant to be temporary +func (s *stream) _recvMetaBytes() ([]byte, error) { + var meta version_metadata + frag := meta.encode() + metaBytes := make([]byte, 0, len(frag)) + for len(metaBytes) < len(frag) { + n, err := s.rwc.Read(frag) + if err != nil { + return nil, err + } + metaBytes = append(metaBytes, frag[:n]...) + } + return metaBytes, nil +} + +// This takes a pointer to a slice as an argument. It checks if there's a +// complete message and, if so, slices out those parts and returns the message, +// true, and nil. If there's no error, but also no complete message, it returns +// nil, false, and nil. If there's an error, it returns nil, false, and the +// error, which the reader then handles (currently, by returning from the +// reader, which causes the connection to close). +func stream_chopMsg(bs *[]byte) ([]byte, bool, error) { + // Returns msg, ok, err + if len(*bs) < len(streamMsg) { + return nil, false, nil + } + for idx := range streamMsg { + if (*bs)[idx] != streamMsg[idx] { + return nil, false, errors.New("bad message") + } + } + msgLen, msgLenLen := wire_decode_uint64((*bs)[len(streamMsg):]) + if msgLen > streamMsgSize { + return nil, false, errors.New("oversized message") + } + msgBegin := len(streamMsg) + msgLenLen + msgEnd := msgBegin + int(msgLen) + if msgLenLen == 0 || len(*bs) < msgEnd { + // We don't have the full message + // Need to buffer this and wait for the rest to come in + return nil, false, nil + } + msg := (*bs)[msgBegin:msgEnd] + (*bs) = (*bs)[msgEnd:] + return msg, true, nil +} diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index f2adf3f..db39d01 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -281,6 +281,7 @@ func (t *switchTable) cleanPeers() { if now.Sub(peer.time) > switch_timeout+switch_throttle { // Longer than switch_timeout to make sure we don't remove a working peer because the root stopped responding. delete(t.data.peers, port) + go t.core.peers.removePeer(port) // TODO figure out if it's safe to do this without a goroutine, or make it safe } } if _, isIn := t.data.peers[t.parent]; !isIn { diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index a83213d..979bc81 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -16,34 +16,28 @@ package yggdrasil import ( "context" - "errors" "fmt" - "io" "math/rand" "net" "sync" - "sync/atomic" "time" "golang.org/x/net/proxy" - "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" - "github.com/yggdrasil-network/yggdrasil-go/src/util" ) -const tcp_msgSize = 2048 + 65535 // TODO figure out what makes sense -const default_tcp_timeout = 6 * time.Second -const tcp_ping_interval = (default_tcp_timeout * 2 / 3) +const default_timeout = 6 * time.Second +const tcp_ping_interval = (default_timeout * 2 / 3) // The TCP listener and information about active TCP connections, to avoid duplication. type tcpInterface struct { core *Core reconfigure chan chan error serv net.Listener - serv_stop chan bool - tcp_timeout time.Duration - tcp_addr string + stop chan bool + timeout time.Duration + addr string mutex sync.Mutex // Protecting the below calls map[string]struct{} conns map[tcpInfo](chan struct{}) @@ -86,7 +80,7 @@ func (iface *tcpInterface) connectSOCKS(socksaddr, peeraddr string) { // Initializes the struct. func (iface *tcpInterface) init(core *Core) (err error) { iface.core = core - iface.serv_stop = make(chan bool, 1) + iface.stop = make(chan bool, 1) iface.reconfigure = make(chan chan error, 1) go func() { for { @@ -95,7 +89,7 @@ func (iface *tcpInterface) init(core *Core) (err error) { updated := iface.core.config.Listen != iface.core.configOld.Listen iface.core.configMutex.RUnlock() if updated { - iface.serv_stop <- true + iface.stop <- true iface.serv.Close() e <- iface.listen() } else { @@ -111,19 +105,19 @@ func (iface *tcpInterface) listen() error { var err error iface.core.configMutex.RLock() - iface.tcp_addr = iface.core.config.Listen - iface.tcp_timeout = time.Duration(iface.core.config.ReadTimeout) * time.Millisecond + iface.addr = iface.core.config.Listen + iface.timeout = time.Duration(iface.core.config.ReadTimeout) * time.Millisecond iface.core.configMutex.RUnlock() - if iface.tcp_timeout >= 0 && iface.tcp_timeout < default_tcp_timeout { - iface.tcp_timeout = default_tcp_timeout + if iface.timeout >= 0 && iface.timeout < default_timeout { + iface.timeout = default_timeout } ctx := context.Background() lc := net.ListenConfig{ Control: iface.tcpContext, } - iface.serv, err = lc.Listen(ctx, "tcp", iface.tcp_addr) + iface.serv, err = lc.Listen(ctx, "tcp", iface.addr) if err == nil { iface.mutex.Lock() iface.calls = make(map[string]struct{}) @@ -147,7 +141,7 @@ func (iface *tcpInterface) listener() { return } select { - case <-iface.serv_stop: + case <-iface.stop: iface.core.log.Errorln("Stopping listener") return default: @@ -186,7 +180,7 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) { if sintf != "" { callname = fmt.Sprintf("%s/%s", saddr, sintf) } - if iface.isAlreadyCalling(saddr) { + if iface.isAlreadyCalling(callname) { return } iface.mutex.Lock() @@ -194,7 +188,7 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) { iface.mutex.Unlock() defer func() { // Block new calls for a little while, to mitigate livelock scenarios - time.Sleep(default_tcp_timeout) + time.Sleep(default_timeout) time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) iface.mutex.Lock() delete(iface.calls, callname) @@ -283,245 +277,21 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) { }() } -// This exchanges/checks connection metadata, sets up the peer struct, sets up the writer goroutine, and then runs the reader within the current goroutine. -// It defers a bunch of cleanup stuff to tear down all of these things when the reader exists (e.g. due to a closed connection or a timeout). func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { defer sock.Close() iface.setExtraOptions(sock) - // Get our keys - myLinkPub, myLinkPriv := crypto.NewBoxKeys() // ephemeral link keys - meta := version_getBaseMetadata() - meta.box = iface.core.boxPub - meta.sig = iface.core.sigPub - meta.link = *myLinkPub - metaBytes := meta.encode() - _, err := sock.Write(metaBytes) + stream := stream{} + stream.init(sock) + local, _, _ := net.SplitHostPort(sock.LocalAddr().String()) + remote, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) + remotelinklocal := net.ParseIP(remote).IsLinkLocalUnicast() + name := "tcp://" + sock.RemoteAddr().String() + link, err := iface.core.link.create(&stream, name, "tcp", local, remote, incoming, remotelinklocal) if err != nil { - return + iface.core.log.Println(err) + panic(err) } - if iface.tcp_timeout > 0 { - sock.SetReadDeadline(time.Now().Add(iface.tcp_timeout)) - } - _, err = sock.Read(metaBytes) - if err != nil { - return - } - meta = version_metadata{} // Reset to zero value - if !meta.decode(metaBytes) || !meta.check() { - // Failed to decode and check the metadata - // If it's a version mismatch issue, then print an error message - base := version_getBaseMetadata() - if meta.meta == base.meta { - if meta.ver > base.ver { - iface.core.log.Errorln("Failed to connect to node:", sock.RemoteAddr().String(), "version:", meta.ver) - } else if meta.ver == base.ver && meta.minorVer > base.minorVer { - iface.core.log.Errorln("Failed to connect to node:", sock.RemoteAddr().String(), "version:", fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) - } - } - // TODO? Block forever to prevent future connection attempts? suppress future messages about the same node? - return - } - remoteAddr, _, e1 := net.SplitHostPort(sock.RemoteAddr().String()) - localAddr, _, e2 := net.SplitHostPort(sock.LocalAddr().String()) - if e1 != nil || e2 != nil { - return - } - info := tcpInfo{ // used as a map key, so don't include ephemeral link key - box: meta.box, - sig: meta.sig, - localAddr: localAddr, - remoteAddr: remoteAddr, - } - if iface.isAlreadyConnected(info) { - return - } - // Quit the parent call if this is a connection to ourself - equiv := func(k1, k2 []byte) bool { - for idx := range k1 { - if k1[idx] != k2[idx] { - return false - } - } - return true - } - if equiv(meta.box[:], iface.core.boxPub[:]) { - return - } - if equiv(meta.sig[:], iface.core.sigPub[:]) { - return - } - // Check if we're authorized to connect to this key / IP - if incoming && !iface.core.peers.isAllowedEncryptionPublicKey(&meta.box) { - // Allow unauthorized peers if they're link-local - raddrStr, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) - raddr := net.ParseIP(raddrStr) - if !raddr.IsLinkLocalUnicast() { - return - } - } - // Check if we already have a connection to this node, close and block if yes - iface.mutex.Lock() - /*if blockChan, isIn := iface.conns[info]; isIn { - iface.mutex.Unlock() - sock.Close() - <-blockChan - return - }*/ - blockChan := make(chan struct{}) - iface.conns[info] = blockChan - iface.mutex.Unlock() - defer func() { - iface.mutex.Lock() - delete(iface.conns, info) - iface.mutex.Unlock() - close(blockChan) - }() - // Note that multiple connections to the same node are allowed - // E.g. over different interfaces - p := iface.core.peers.newPeer(&meta.box, &meta.sig, crypto.GetSharedKey(myLinkPriv, &meta.link), sock.RemoteAddr().String()) - p.linkOut = make(chan []byte, 1) - in := func(bs []byte) { - p.handlePacket(bs) - } - out := make(chan []byte, 1) - defer close(out) - go func() { - // This goroutine waits for outgoing packets, link protocol traffic, or sends idle keep-alive traffic - send := func(msg []byte) { - msgLen := wire_encode_uint64(uint64(len(msg))) - buf := net.Buffers{tcp_msg[:], msgLen, msg} - buf.WriteTo(sock) - atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg))) - util.PutBytes(msg) - } - timerInterval := tcp_ping_interval - timer := time.NewTimer(timerInterval) - defer timer.Stop() - for { - select { - case msg := <-p.linkOut: - // Always send outgoing link traffic first, if needed - send(msg) - continue - default: - } - // Otherwise wait reset the timer and wait for something to do - timer.Stop() - select { - case <-timer.C: - default: - } - timer.Reset(timerInterval) - select { - case _ = <-timer.C: - send(nil) // TCP keep-alive traffic - case msg := <-p.linkOut: - send(msg) - case msg, ok := <-out: - if !ok { - return - } - send(msg) // Block until the socket write has finished - // Now inform the switch that we're ready for more traffic - p.core.switchTable.idleIn <- p.port - } - } - }() - p.core.switchTable.idleIn <- p.port // Start in the idle state - p.out = func(msg []byte) { - defer func() { recover() }() - out <- msg - } - p.close = func() { sock.Close() } - go p.linkLoop() - defer func() { - // Put all of our cleanup here... - p.core.peers.removePeer(p.port) - }() - us, _, _ := net.SplitHostPort(sock.LocalAddr().String()) - them, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) - themNodeID := crypto.GetNodeID(&meta.box) - themAddr := address.AddrForNodeID(themNodeID) - themAddrString := net.IP(themAddr[:]).String() - themString := fmt.Sprintf("%s@%s", themAddrString, them) - iface.core.log.Infof("Connected: %s, source: %s", themString, us) - err = iface.reader(sock, in) // In this goroutine, because of defers - if err == nil { - iface.core.log.Infof("Disconnected: %s, source: %s", themString, us) - } else { - iface.core.log.Infof("Disconnected: %s, source: %s, error: %s", themString, us, err) - } - return -} - -// This reads from the socket into a []byte buffer for incomping messages. -// It copies completed messages out of the cache into a new slice, and passes them to the peer struct via the provided `in func([]byte)` argument. -// Then it shifts the incomplete fragments of data forward so future reads won't overwrite it. -func (iface *tcpInterface) reader(sock net.Conn, in func([]byte)) error { - bs := make([]byte, 2*tcp_msgSize) - frag := bs[:0] - for { - if iface.tcp_timeout > 0 { - sock.SetReadDeadline(time.Now().Add(iface.tcp_timeout)) - } - n, err := sock.Read(bs[len(frag):]) - if n > 0 { - frag = bs[:len(frag)+n] - for { - msg, ok, err2 := tcp_chop_msg(&frag) - if err2 != nil { - return fmt.Errorf("Message error: %v", err2) - } - if !ok { - // We didn't get the whole message yet - break - } - newMsg := append(util.GetBytes(), msg...) - in(newMsg) - util.Yield() - } - frag = append(bs[:0], frag...) - } - if err != nil || n == 0 { - if err != io.EOF { - return err - } - return nil - } - } -} - -//////////////////////////////////////////////////////////////////////////////// - -// These are 4 bytes of padding used to catch if something went horribly wrong with the tcp connection. -var tcp_msg = [...]byte{0xde, 0xad, 0xb1, 0x75} // "dead bits" - -// This takes a pointer to a slice as an argument. -// It checks if there's a complete message and, if so, slices out those parts and returns the message, true, and nil. -// If there's no error, but also no complete message, it returns nil, false, and nil. -// If there's an error, it returns nil, false, and the error, which the reader then handles (currently, by returning from the reader, which causes the connection to close). -func tcp_chop_msg(bs *[]byte) ([]byte, bool, error) { - // Returns msg, ok, err - if len(*bs) < len(tcp_msg) { - return nil, false, nil - } - for idx := range tcp_msg { - if (*bs)[idx] != tcp_msg[idx] { - return nil, false, errors.New("Bad message!") - } - } - msgLen, msgLenLen := wire_decode_uint64((*bs)[len(tcp_msg):]) - if msgLen > tcp_msgSize { - return nil, false, errors.New("Oversized message!") - } - msgBegin := len(tcp_msg) + msgLenLen - msgEnd := msgBegin + int(msgLen) - if msgLenLen == 0 || len(*bs) < msgEnd { - // We don't have the full message - // Need to buffer this and wait for the rest to come in - return nil, false, nil - } - msg := (*bs)[msgBegin:msgEnd] - (*bs) = (*bs)[msgEnd:] - return msg, true, nil + iface.core.log.Debugln("DEBUG: starting handler for", name) + err = link.handler() + iface.core.log.Debugln("DEBUG: stopped handler for", name, err) }