5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2025-01-22 20:44:38 +00:00

Break deadlock by creating session recv queue when session is created instead of repointing at search completion, also make expired atomic

This commit is contained in:
Neil Alexander 2019-04-22 11:20:35 +01:00
parent 5a02e2ff44
commit 47eb2fc47f
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944
3 changed files with 32 additions and 44 deletions

View File

@ -193,12 +193,13 @@ func (tun *TunAdapter) connReader(conn *yggdrasil.Conn) error {
delete(tun.conns, remoteNodeID)
tun.mutex.Unlock()
}()
tun.log.Debugln("Start connection reader for", conn.String())
b := make([]byte, 65535)
for {
n, err := conn.Read(b)
if err != nil {
tun.log.Errorln("TUN/TAP conn read error:", err)
return err
continue
}
if n == 0 {
continue
@ -209,7 +210,7 @@ func (tun *TunAdapter) connReader(conn *yggdrasil.Conn) error {
continue
}
if w != n {
tun.log.Errorln("TUN/TAP iface write len didn't match conn read len")
tun.log.Errorln("TUN/TAP iface write mismatch:", w, "bytes written vs", n, "bytes given")
continue
}
}
@ -220,7 +221,7 @@ func (tun *TunAdapter) ifaceReader() error {
for {
n, err := tun.iface.Read(bs)
if err != nil {
tun.log.Errorln("TUN/TAP iface read error:", err)
continue
}
// Look up if the dstination address is somewhere we already have an
// open connection to
@ -253,6 +254,10 @@ func (tun *TunAdapter) ifaceReader() error {
// Unknown address length or protocol
continue
}
if !dstAddr.IsValid() && !dstSnet.IsValid() {
// For now don't deal with any non-Yggdrasil ranges
continue
}
dstNodeID, dstNodeIDMask = dstAddr.GetNodeIDandMask()
// Do we have an active connection for this node ID?
tun.mutex.Lock()
@ -260,10 +265,11 @@ func (tun *TunAdapter) ifaceReader() error {
tun.mutex.Unlock()
w, err := conn.Write(bs)
if err != nil {
tun.log.Println("TUN/TAP conn write error:", err)
tun.log.Errorln("TUN/TAP conn write error:", err)
continue
}
if w != n {
tun.log.Errorln("TUN/TAP conn write mismatch:", w, "bytes written vs", n, "bytes given")
continue
}
} else {
@ -273,7 +279,7 @@ func (tun *TunAdapter) ifaceReader() error {
go tun.connReader(&conn)
} else {
tun.mutex.Unlock()
tun.log.Println("TUN/TAP dial error:", err)
tun.log.Errorln("TUN/TAP dial error:", err)
}
}

View File

@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
@ -17,9 +18,9 @@ type Conn struct {
recv chan *wire_trafficPacket // Eventually gets attached to session.recv
mutex *sync.RWMutex
session *sessionInfo
readDeadline time.Time // TODO timer
writeDeadline time.Time // TODO timer
expired bool
readDeadline atomic.Value // time.Time // TODO timer
writeDeadline atomic.Value // time.Time // TODO timer
expired atomic.Value // bool
}
func (c *Conn) String() string {
@ -32,14 +33,12 @@ func (c *Conn) startSearch() {
if err != nil {
c.core.log.Debugln("DHT search failed:", err)
c.mutex.Lock()
c.expired = true
c.mutex.Unlock()
c.expired.Store(true)
return
}
if sinfo != nil {
c.mutex.Lock()
c.session = sinfo
c.session.recv = c.recv
c.nodeID, c.nodeMask = sinfo.theirAddr.GetNodeIDandMask()
c.mutex.Unlock()
}
@ -75,30 +74,20 @@ func (c *Conn) startSearch() {
}
func (c *Conn) Read(b []byte) (int, error) {
err := func() error {
c.mutex.RLock()
defer c.mutex.RUnlock()
if c.expired {
return errors.New("session is closed")
}
return nil
}()
if err != nil {
return 0, err
if e, ok := c.expired.Load().(bool); ok && e {
return 0, errors.New("session is closed")
}
c.mutex.RLock()
sinfo := c.session
c.mutex.RUnlock()
select {
// TODO...
case p, ok := <-c.recv:
if !ok {
c.mutex.Lock()
c.expired = true
c.mutex.Unlock()
c.expired.Store(true)
return 0, errors.New("session is closed")
}
defer util.PutBytes(p.Payload)
c.mutex.RLock()
sinfo := c.session
c.mutex.RUnlock()
var err error
sinfo.doWorker(func() {
if !sinfo.nonceIsOK(&p.Nonce) {
@ -131,19 +120,12 @@ func (c *Conn) Read(b []byte) (int, error) {
}
func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
var sinfo *sessionInfo
err = func() error {
c.mutex.RLock()
defer c.mutex.RUnlock()
if c.expired {
return errors.New("session is closed")
}
sinfo = c.session
return nil
}()
if err != nil {
return 0, err
if e, ok := c.expired.Load().(bool); ok && e {
return 0, errors.New("session is closed")
}
c.mutex.RLock()
sinfo := c.session
c.mutex.RUnlock()
if sinfo == nil {
c.core.router.doAdmin(func() {
c.startSearch()
@ -173,7 +155,7 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
}
func (c *Conn) Close() error {
c.expired = true
c.expired.Store(true)
c.session.close()
return nil
}
@ -195,11 +177,11 @@ func (c *Conn) SetDeadline(t time.Time) error {
}
func (c *Conn) SetReadDeadline(t time.Time) error {
c.readDeadline = t
c.readDeadline.Store(t)
return nil
}
func (c *Conn) SetWriteDeadline(t time.Time) error {
c.writeDeadline = t
c.writeDeadline.Store(t)
return nil
}

View File

@ -321,6 +321,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
sinfo.worker = make(chan func(), 1)
sinfo.recv = make(chan *wire_trafficPacket, 32)
ss.sinfos[sinfo.myHandle] = &sinfo
ss.byMySes[sinfo.mySesPub] = &sinfo.myHandle
ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
@ -480,12 +481,11 @@ func (ss *sessions) handlePing(ping *sessionPing) {
mutex: &sync.RWMutex{},
nodeID: crypto.GetNodeID(&sinfo.theirPermPub),
nodeMask: &crypto.NodeID{},
recv: make(chan *wire_trafficPacket, 32),
recv: sinfo.recv,
}
for i := range conn.nodeMask {
conn.nodeMask[i] = 0xFF
}
sinfo.recv = conn.recv
ss.listener.conn <- conn
} else {
ss.core.log.Debugln("Received new session but there is no listener, ignoring")