From 6bbd8c1b30865f10448251b52c8b4c3c23740d0c Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Sat, 5 Jan 2019 12:06:45 +0000 Subject: [PATCH] Rethink channels, more error throwing --- src/yggdrasil/awdl.go | 70 ++++++++++++++++++++++++----------------- src/yggdrasil/core.go | 2 +- src/yggdrasil/mobile.go | 18 ++++++----- 3 files changed, 53 insertions(+), 37 deletions(-) diff --git a/src/yggdrasil/awdl.go b/src/yggdrasil/awdl.go index d3d69d5..f38d101 100644 --- a/src/yggdrasil/awdl.go +++ b/src/yggdrasil/awdl.go @@ -1,9 +1,14 @@ package yggdrasil import ( + "errors" + "fmt" "sync" + "sync/atomic" + "time" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" + "github.com/yggdrasil-network/yggdrasil-go/src/util" ) type awdl struct { @@ -14,8 +19,8 @@ type awdl struct { type awdlInterface struct { awdl *awdl - recv <-chan []byte // traffic received from the network - send chan<- []byte // traffic to send to the network + fromAWDL chan []byte + toAWDL chan []byte shutdown chan bool peer *peer } @@ -29,11 +34,11 @@ func (l *awdl) init(c *Core) error { return nil } -func (l *awdl) create(boxPubKey *crypto.BoxPubKey, sigPubKey *crypto.SigPubKey, name string) *awdlInterface { +func (l *awdl) create(boxPubKey *crypto.BoxPubKey, sigPubKey *crypto.SigPubKey, name string) (*awdlInterface, error) { shared := crypto.GetSharedKey(&l.core.boxPriv, boxPubKey) intf := awdlInterface{ - recv: make(<-chan []byte), - send: make(chan<- []byte), + fromAWDL: make(chan []byte, 32), + toAWDL: make(chan []byte, 32), shutdown: make(chan bool), peer: l.core.peers.newPeer(boxPubKey, sigPubKey, shared, name), } @@ -41,21 +46,21 @@ func (l *awdl) create(boxPubKey *crypto.BoxPubKey, sigPubKey *crypto.SigPubKey, l.mutex.Lock() l.interfaces[name] = &intf l.mutex.Unlock() - intf.peer.linkOut = make(chan []byte, 1) + intf.peer.linkOut = make(chan []byte, 1) // protocol traffic intf.peer.out = func(msg []byte) { defer func() { recover() }() - intf.send <- msg - l.core.switchTable.idleIn <- intf.peer.port - } + intf.toAWDL <- msg + } // called by peer.sendPacket() + l.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle intf.peer.close = func() { - close(intf.send) + close(intf.fromAWDL) + close(intf.toAWDL) } - go intf.peer.linkLoop() - go intf.handler() - l.core.switchTable.idleIn <- intf.peer.port - return &intf + go intf.handler() // start listening for packets from switch + go intf.peer.linkLoop() // start link loop + return &intf, nil } - return nil + return nil, errors.New("l.core.peers.newPeer failed") } func (l *awdl) getInterface(identity string) *awdlInterface { @@ -67,45 +72,52 @@ func (l *awdl) getInterface(identity string) *awdlInterface { return nil } -func (l *awdl) shutdown(identity string) { +func (l *awdl) shutdown(identity string) error { if intf, ok := l.interfaces[identity]; ok { intf.shutdown <- true l.core.peers.removePeer(intf.peer.port) l.mutex.Lock() delete(l.interfaces, identity) l.mutex.Unlock() + return nil + } else { + return errors.New(fmt.Sprintf("Interface '%s' doesn't exist or already shutdown", identity)) } } func (ai *awdlInterface) handler() { + send := func(msg []byte) { + ai.toAWDL <- msg + atomic.AddUint64(&ai.peer.bytesSent, uint64(len(msg))) + util.PutBytes(msg) + } for { - /*timerInterval := tcp_ping_interval + timerInterval := tcp_ping_interval timer := time.NewTimer(timerInterval) - defer timer.Stop()*/ + defer timer.Stop() select { case p := <-ai.peer.linkOut: - ai.send <- p - ai.awdl.core.switchTable.idleIn <- ai.peer.port + send(p) continue default: } - /*timer.Stop() + timer.Stop() select { case <-timer.C: default: } - timer.Reset(timerInterval)*/ + timer.Reset(timerInterval) select { - //case _ = <-timer.C: - // ai.send <- nil - case r := <-ai.recv: // traffic received from AWDL + case _ = <-timer.C: + send([]byte{'H', 'E', 'L', 'L', 'O'}) + case p := <-ai.peer.linkOut: + send(p) + continue + case r := <-ai.fromAWDL: ai.peer.handlePacket(r) + ai.awdl.core.switchTable.idleIn <- ai.peer.port case <-ai.shutdown: return - case p := <-ai.peer.linkOut: - ai.send <- p - ai.awdl.core.switchTable.idleIn <- ai.peer.port - continue } } } diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index b99d1f2..a1c2127 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -35,7 +35,7 @@ type Core struct { multicast multicast nodeinfo nodeinfo tcp tcpInterface - awdl awdl + awdl awdl log *log.Logger ifceExpr []*regexp.Regexp // the zone of link-local IPv6 peers must match this } diff --git a/src/yggdrasil/mobile.go b/src/yggdrasil/mobile.go index a81d82c..1c9a84f 100644 --- a/src/yggdrasil/mobile.go +++ b/src/yggdrasil/mobile.go @@ -108,20 +108,24 @@ func (c *Core) AWDLCreateInterface(boxPubKey string, sigPubKey string, name stri } copy(boxPub[:], boxPubHex) copy(sigPub[:], sigPubHex) - if intf := c.awdl.create(&boxPub, &sigPub, name); intf != nil { - return nil + if intf, err := c.awdl.create(&boxPub, &sigPub, name); err == nil { + if intf != nil { + return err + } else { + return errors.New("c.awdl.create didn't return an interface") + } } else { - return errors.New("No interface was created") + return err } } -func (c *Core) AWDLShutdownInterface(name string) { - c.awdl.shutdown(name) +func (c *Core) AWDLShutdownInterface(name string) error { + return c.awdl.shutdown(name) } func (c *Core) AWDLRecvPacket(identity string) ([]byte, error) { if intf := c.awdl.getInterface(identity); intf != nil { - return <-intf.recv, nil + return <-intf.toAWDL, nil } return nil, errors.New("identity not known: " + identity) } @@ -129,7 +133,7 @@ func (c *Core) AWDLRecvPacket(identity string) ([]byte, error) { func (c *Core) AWDLSendPacket(identity string, buf []byte) error { packet := append(util.GetBytes(), buf[:]...) if intf := c.awdl.getInterface(identity); intf != nil { - intf.send <- packet + intf.fromAWDL <- packet return nil } return errors.New("identity not known: " + identity)