5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-26 03:41:37 +00:00

more TunAdapter migration

This commit is contained in:
Arceliar 2019-08-25 18:53:11 -05:00
parent aaf34c6304
commit b2a2e251ad
3 changed files with 84 additions and 78 deletions

View File

@ -27,12 +27,10 @@ type tunConn struct {
} }
func (s *tunConn) close() { func (s *tunConn) close() {
s.tun.mutex.Lock() s.tun.RecvFrom(s, s._close_from_tun)
defer s.tun.mutex.Unlock()
s._close_nomutex()
} }
func (s *tunConn) _close_nomutex() { func (s *tunConn) _close_from_tun() {
s.conn.Close() s.conn.Close()
delete(s.tun.addrToConn, s.addr) delete(s.tun.addrToConn, s.addr)
delete(s.tun.subnetToConn, s.snet) delete(s.tun.subnetToConn, s.snet)
@ -118,6 +116,12 @@ func (s *tunConn) _read(bs []byte) (err error) {
return return
} }
func (s *tunConn) writeFrom(from phony.Actor, bs []byte) {
s.RecvFrom(from, func() {
s._write(bs)
})
}
func (s *tunConn) _write(bs []byte) (err error) { func (s *tunConn) _write(bs []byte) (err error) {
select { select {
case <-s.stop: case <-s.stop:

View File

@ -90,14 +90,12 @@ func (w *tunWriter) _write(b []byte) {
util.PutBytes(b) util.PutBytes(b)
} }
if err != nil { if err != nil {
w.tun.mutex.Lock() w.tun.RecvFrom(w, func() {
open := w.tun.isOpen if !w.tun.isOpen {
w.tun.mutex.Unlock()
if !open {
return
}
w.tun.log.Errorln("TUN/TAP iface write error:", err) w.tun.log.Errorln("TUN/TAP iface write error:", err)
} }
})
}
if written != n { if written != n {
w.tun.log.Errorln("TUN/TAP iface write mismatch:", written, "bytes written vs", n, "bytes given") w.tun.log.Errorln("TUN/TAP iface write mismatch:", written, "bytes written vs", n, "bytes given")
} }
@ -221,7 +219,6 @@ func (tun *TunAdapter) _handlePacket(recvd []byte, err error) {
} }
// Do we have an active connection for this node address? // Do we have an active connection for this node address?
var dstNodeID, dstNodeIDMask *crypto.NodeID var dstNodeID, dstNodeIDMask *crypto.NodeID
tun.mutex.RLock()
session, isIn := tun.addrToConn[dstAddr] session, isIn := tun.addrToConn[dstAddr]
if !isIn || session == nil { if !isIn || session == nil {
session, isIn = tun.subnetToConn[dstSnet] session, isIn = tun.subnetToConn[dstSnet]
@ -235,7 +232,6 @@ func (tun *TunAdapter) _handlePacket(recvd []byte, err error) {
} }
} }
} }
tun.mutex.RUnlock()
// If we don't have a connection then we should open one // If we don't have a connection then we should open one
if !isIn || session == nil { if !isIn || session == nil {
// Check we haven't been given empty node ID, really this shouldn't ever // Check we haven't been given empty node ID, really this shouldn't ever
@ -243,45 +239,37 @@ func (tun *TunAdapter) _handlePacket(recvd []byte, err error) {
if dstNodeID == nil || dstNodeIDMask == nil { if dstNodeID == nil || dstNodeIDMask == nil {
panic("Given empty dstNodeID and dstNodeIDMask - this shouldn't happen") panic("Given empty dstNodeID and dstNodeIDMask - this shouldn't happen")
} }
// Dial to the remote node
go func() {
// FIXME just spitting out a goroutine to do this is kind of ugly and means we drop packets until the dial finishes
tun.mutex.Lock()
_, known := tun.dials[*dstNodeID] _, known := tun.dials[*dstNodeID]
tun.dials[*dstNodeID] = append(tun.dials[*dstNodeID], bs) tun.dials[*dstNodeID] = append(tun.dials[*dstNodeID], bs)
for len(tun.dials[*dstNodeID]) > 32 { for len(tun.dials[*dstNodeID]) > 32 {
util.PutBytes(tun.dials[*dstNodeID][0]) util.PutBytes(tun.dials[*dstNodeID][0])
tun.dials[*dstNodeID] = tun.dials[*dstNodeID][1:] tun.dials[*dstNodeID] = tun.dials[*dstNodeID][1:]
} }
tun.mutex.Unlock() if !known {
if known { go func() {
return
}
var tc *tunConn
if conn, err := tun.dialer.DialByNodeIDandMask(dstNodeID, dstNodeIDMask); err == nil { if conn, err := tun.dialer.DialByNodeIDandMask(dstNodeID, dstNodeIDMask); err == nil {
tun.RecvFrom(nil, func() {
// We've been given a connection so prepare the session wrapper // We've been given a connection so prepare the session wrapper
if tc, err = tun.wrap(conn); err != nil { packets := tun.dials[*dstNodeID]
delete(tun.dials, *dstNodeID)
var tc *tunConn
var err error
if tc, err = tun._wrap(conn); err != nil {
// Something went wrong when storing the connection, typically that // Something went wrong when storing the connection, typically that
// something already exists for this address or subnet // something already exists for this address or subnet
tun.log.Debugln("TUN/TAP iface wrap:", err) tun.log.Debugln("TUN/TAP iface wrap:", err)
return
} }
}
tun.mutex.Lock()
packets := tun.dials[*dstNodeID]
delete(tun.dials, *dstNodeID)
tun.mutex.Unlock()
if tc != nil {
for _, packet := range packets { for _, packet := range packets {
p := packet // Possibly required because of how range tc.writeFrom(nil, packet)
<-tc.SyncExec(func() { tc._write(p) })
} }
})
} }
}() }()
// While the dial is going on we can't do much else }
return
} }
// If we have a connection now, try writing to it // If we have a connection now, try writing to it
if isIn && session != nil { if isIn && session != nil {
session.RecvFrom(tun, func() { session._write(bs) }) session.writeFrom(tun, bs)
} }
} }

View File

@ -13,7 +13,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"sync" //"sync"
"github.com/Arceliar/phony" "github.com/Arceliar/phony"
"github.com/gologme/log" "github.com/gologme/log"
@ -48,7 +48,7 @@ type TunAdapter struct {
mtu int mtu int
iface *water.Interface iface *water.Interface
phony.Inbox // Currently only used for _handlePacket from the reader, TODO: all the stuff that currently needs a mutex below phony.Inbox // Currently only used for _handlePacket from the reader, TODO: all the stuff that currently needs a mutex below
mutex sync.RWMutex // Protects the below //mutex sync.RWMutex // Protects the below
addrToConn map[address.Address]*tunConn addrToConn map[address.Address]*tunConn
subnetToConn map[address.Subnet]*tunConn subnetToConn map[address.Subnet]*tunConn
dials map[crypto.NodeID][][]byte // Buffer of packets to send after dialing finishes dials map[crypto.NodeID][][]byte // Buffer of packets to send after dialing finishes
@ -122,8 +122,16 @@ func (tun *TunAdapter) Init(config *config.NodeState, log *log.Logger, listener
} }
// Start the setup process for the TUN/TAP adapter. If successful, starts the // Start the setup process for the TUN/TAP adapter. If successful, starts the
// read/write goroutines to handle packets on that interface. // reader actor to handle packets on that interface.
func (tun *TunAdapter) Start() error { func (tun *TunAdapter) Start() error {
var err error
<-tun.SyncExec(func() {
err = tun._start()
})
return err
}
func (tun *TunAdapter) _start() error {
current := tun.config.GetCurrent() current := tun.config.GetCurrent()
if tun.config == nil || tun.listener == nil || tun.dialer == nil { if tun.config == nil || tun.listener == nil || tun.dialer == nil {
return errors.New("No configuration available to TUN/TAP") return errors.New("No configuration available to TUN/TAP")
@ -150,10 +158,8 @@ func (tun *TunAdapter) Start() error {
tun.log.Debugln("Not starting TUN/TAP as ifname is none or dummy") tun.log.Debugln("Not starting TUN/TAP as ifname is none or dummy")
return nil return nil
} }
tun.mutex.Lock()
tun.isOpen = true tun.isOpen = true
tun.reconfigure = make(chan chan error) tun.reconfigure = make(chan chan error)
tun.mutex.Unlock()
go func() { go func() {
for { for {
e := <-tun.reconfigure e := <-tun.reconfigure
@ -173,6 +179,14 @@ func (tun *TunAdapter) Start() error {
// Start the setup process for the TUN/TAP adapter. If successful, starts the // Start the setup process for the TUN/TAP adapter. If successful, starts the
// read/write goroutines to handle packets on that interface. // read/write goroutines to handle packets on that interface.
func (tun *TunAdapter) Stop() error { func (tun *TunAdapter) Stop() error {
var err error
<-tun.SyncExec(func() {
err = tun._stop()
})
return err
}
func (tun *TunAdapter) _stop() error {
tun.isOpen = false tun.isOpen = false
// TODO: we have nothing that cleanly stops all the various goroutines opened // TODO: we have nothing that cleanly stops all the various goroutines opened
// by TUN/TAP, e.g. readers/writers, sessions // by TUN/TAP, e.g. readers/writers, sessions
@ -219,15 +233,17 @@ func (tun *TunAdapter) handler() error {
tun.log.Errorln("TUN/TAP connection accept error:", err) tun.log.Errorln("TUN/TAP connection accept error:", err)
return err return err
} }
if _, err := tun.wrap(conn); err != nil { <-tun.SyncExec(func() {
if _, err := tun._wrap(conn); err != nil {
// Something went wrong when storing the connection, typically that // Something went wrong when storing the connection, typically that
// something already exists for this address or subnet // something already exists for this address or subnet
tun.log.Debugln("TUN/TAP handler wrap:", err) tun.log.Debugln("TUN/TAP handler wrap:", err)
} }
})
} }
} }
func (tun *TunAdapter) wrap(conn *yggdrasil.Conn) (c *tunConn, err error) { func (tun *TunAdapter) _wrap(conn *yggdrasil.Conn) (c *tunConn, err error) {
// Prepare a session wrapper for the given connection // Prepare a session wrapper for the given connection
s := tunConn{ s := tunConn{
tun: tun, tun: tun,
@ -240,17 +256,15 @@ func (tun *TunAdapter) wrap(conn *yggdrasil.Conn) (c *tunConn, err error) {
s.addr = *address.AddrForNodeID(&remoteNodeID) s.addr = *address.AddrForNodeID(&remoteNodeID)
s.snet = *address.SubnetForNodeID(&remoteNodeID) s.snet = *address.SubnetForNodeID(&remoteNodeID)
// Work out if this is already a destination we already know about // Work out if this is already a destination we already know about
tun.mutex.Lock()
defer tun.mutex.Unlock()
atc, aok := tun.addrToConn[s.addr] atc, aok := tun.addrToConn[s.addr]
stc, sok := tun.subnetToConn[s.snet] stc, sok := tun.subnetToConn[s.snet]
// If we know about a connection for this destination already then assume it // If we know about a connection for this destination already then assume it
// is no longer valid and close it // is no longer valid and close it
if aok { if aok {
atc._close_nomutex() atc._close_from_tun()
err = errors.New("replaced connection for address") err = errors.New("replaced connection for address")
} else if sok { } else if sok {
stc._close_nomutex() stc._close_from_tun()
err = errors.New("replaced connection for subnet") err = errors.New("replaced connection for subnet")
} }
// Save the session wrapper so that we can look it up quickly next time // Save the session wrapper so that we can look it up quickly next time