5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-29 19:01:50 +00:00

don't spam searches for unused connections. todo: timeout old connections somehow

This commit is contained in:
Arceliar 2019-05-23 20:27:52 -05:00
parent 70774fc3de
commit 5ea864869a
3 changed files with 46 additions and 37 deletions

View File

@ -3,6 +3,7 @@ package tuntap
import ( import (
"errors" "errors"
"github.com/yggdrasil-network/yggdrasil-go/src/address"
"github.com/yggdrasil-network/yggdrasil-go/src/util" "github.com/yggdrasil-network/yggdrasil-go/src/util"
"github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
) )
@ -10,11 +11,21 @@ import (
type tunConn struct { type tunConn struct {
tun *TunAdapter tun *TunAdapter
conn *yggdrasil.Conn conn *yggdrasil.Conn
addr address.Address
snet address.Subnet
send chan []byte send chan []byte
stop chan interface{} stop chan interface{}
} }
func (s *tunConn) close() { func (s *tunConn) close() {
s.tun.mutex.Lock()
s._close_nomutex()
s.tun.mutex.Unlock()
}
func (s *tunConn) _close_nomutex() {
delete(s.tun.addrToConn, s.addr)
delete(s.tun.subnetToConn, s.snet)
close(s.stop) close(s.stop)
} }
@ -32,6 +43,7 @@ func (s *tunConn) reader() error {
b := make([]byte, 65535) b := make([]byte, 65535)
for { for {
go func() { go func() {
// TODO read timeout and close
if n, err = s.conn.Read(b); err != nil { if n, err = s.conn.Read(b); err != nil {
s.tun.log.Errorln(s.conn.String(), "TUN/TAP conn read error:", err) s.tun.log.Errorln(s.conn.String(), "TUN/TAP conn read error:", err)
return return
@ -72,6 +84,7 @@ func (s *tunConn) writer() error {
if !ok { if !ok {
return errors.New("send closed") return errors.New("send closed")
} }
// TODO write timeout and close
if _, err := s.conn.Write(b); err != nil { if _, err := s.conn.Write(b); err != nil {
s.tun.log.Errorln(s.conn.String(), "TUN/TAP conn write error:", err) s.tun.log.Errorln(s.conn.String(), "TUN/TAP conn write error:", err)
} }

View File

@ -236,26 +236,26 @@ func (tun *TunAdapter) wrap(conn *yggdrasil.Conn) (c *tunConn, err error) {
} }
// Get the remote address and subnet of the other side // Get the remote address and subnet of the other side
remoteNodeID := conn.RemoteAddr() remoteNodeID := conn.RemoteAddr()
remoteAddr := address.AddrForNodeID(&remoteNodeID) s.addr = *address.AddrForNodeID(&remoteNodeID)
remoteSubnet := address.SubnetForNodeID(&remoteNodeID) s.snet = *address.SubnetForNodeID(&remoteNodeID)
// Work out if this is already a destination we already know about // Work out if this is already a destination we already know about
tun.mutex.Lock() tun.mutex.Lock()
defer tun.mutex.Unlock() defer tun.mutex.Unlock()
atc, aok := tun.addrToConn[*remoteAddr] atc, aok := tun.addrToConn[s.addr]
stc, sok := tun.subnetToConn[*remoteSubnet] stc, sok := tun.subnetToConn[s.snet]
// If we know about a connection for this destination already then assume it // If we know about a connection for this destination already then assume it
// is no longer valid and close it // is no longer valid and close it
if aok { if aok {
atc.close() atc._close_nomutex()
err = errors.New("replaced connection for address") err = errors.New("replaced connection for address")
} else if sok { } else if sok {
stc.close() stc._close_nomutex()
err = errors.New("replaced connection for subnet") err = errors.New("replaced connection for subnet")
} }
// Save the session wrapper so that we can look it up quickly next time // Save the session wrapper so that we can look it up quickly next time
// we receive a packet through the interface for this address // we receive a packet through the interface for this address
tun.addrToConn[*remoteAddr] = &s tun.addrToConn[s.addr] = &s
tun.subnetToConn[*remoteSubnet] = &s tun.subnetToConn[s.snet] = &s
// Start the connection goroutines // Start the connection goroutines
go s.reader() go s.reader()
go s.writer() go s.writer()

View File

@ -37,6 +37,7 @@ type Conn struct {
writeDeadline atomic.Value // time.Time // TODO timer writeDeadline atomic.Value // time.Time // TODO timer
searching atomic.Value // bool searching atomic.Value // bool
searchwait chan struct{} // Never reset this, it's only used for the initial search searchwait chan struct{} // Never reset this, it's only used for the initial search
writebuf [][]byte // Packets to be sent if/when the search finishes
} }
// TODO func NewConn() that initializes additional fields as needed // TODO func NewConn() that initializes additional fields as needed
@ -60,23 +61,13 @@ func (c *Conn) String() string {
func (c *Conn) startSearch() { func (c *Conn) startSearch() {
// The searchCompleted callback is given to the search // The searchCompleted callback is given to the search
searchCompleted := func(sinfo *sessionInfo, err error) { searchCompleted := func(sinfo *sessionInfo, err error) {
defer c.searching.Store(false)
// If the search failed for some reason, e.g. it hit a dead end or timed // If the search failed for some reason, e.g. it hit a dead end or timed
// out, then do nothing // out, then do nothing
if err != nil { if err != nil {
c.core.log.Debugln(c.String(), "DHT search failed:", err) c.core.log.Debugln(c.String(), "DHT search failed:", err)
go func() {
time.Sleep(time.Second)
c.mutex.RLock()
closed := c.closed
c.mutex.RUnlock()
if !closed {
// Restart the search, or else Write can stay blocked forever
c.core.router.admin <- c.startSearch
}
}()
return return
} }
defer c.searching.Store(false)
// Take the connection mutex // Take the connection mutex
c.mutex.Lock() c.mutex.Lock()
defer c.mutex.Unlock() defer c.mutex.Unlock()
@ -102,6 +93,16 @@ func (c *Conn) startSearch() {
// Things were closed before the search returned // Things were closed before the search returned
// Go ahead and close it again to make sure the session is cleaned up // Go ahead and close it again to make sure the session is cleaned up
go c.Close() go c.Close()
} else {
// Send any messages we may have buffered
var msgs [][]byte
msgs, c.writebuf = c.writebuf, nil
go func() {
for _, msg := range msgs {
c.Write(msg)
util.PutBytes(msg)
}
}()
} }
} }
// doSearch will be called below in response to one or more conditions // doSearch will be called below in response to one or more conditions
@ -238,8 +239,6 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
c.mutex.RLock() c.mutex.RLock()
sinfo := c.session sinfo := c.session
c.mutex.RUnlock() c.mutex.RUnlock()
timer := getDeadlineTimer(&c.writeDeadline)
defer util.TimerStop(timer)
// If the session doesn't exist, or isn't initialised (which probably means // If the session doesn't exist, or isn't initialised (which probably means
// that the search didn't complete successfully) then we may need to wait for // that the search didn't complete successfully) then we may need to wait for
// the search to complete or start the search again // the search to complete or start the search again
@ -249,22 +248,15 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
// No search was already taking place so start a new one // No search was already taking place so start a new one
c.core.router.doAdmin(c.startSearch) c.core.router.doAdmin(c.startSearch)
} }
// Wait for the search to complete // Buffer the packet to be sent if/when the search is finished
select { c.mutex.Lock()
case <-c.searchwait: defer c.mutex.Unlock()
case <-timer.C: c.writebuf = append(c.writebuf, append(util.GetBytes(), b...))
return 0, ConnError{errors.New("Timeout"), true, false} for len(c.writebuf) > 32 {
} util.PutBytes(c.writebuf[0])
// Retrieve our session info again c.writebuf = c.writebuf[1:]
c.mutex.RLock()
sinfo = c.session
c.mutex.RUnlock()
// If sinfo is still nil at this point then the search failed and the
// searchwait channel has been recreated, so might as well give up and
// return an error code
if sinfo == nil {
return 0, errors.New("search failed")
} }
return len(b), nil
} }
var packet []byte var packet []byte
done := make(chan struct{}) done := make(chan struct{})
@ -283,13 +275,17 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
packet = p.encode() packet = p.encode()
sinfo.bytesSent += uint64(len(b)) sinfo.bytesSent += uint64(len(b))
} }
// Set up a timer so this doesn't block forever
timer := getDeadlineTimer(&c.writeDeadline)
defer util.TimerStop(timer)
// Hand over to the session worker // Hand over to the session worker
select { // Send to worker select { // Send to worker
case sinfo.worker <- workerFunc: case sinfo.worker <- workerFunc:
case <-timer.C: case <-timer.C:
return 0, ConnError{errors.New("Timeout"), true, false} return 0, ConnError{errors.New("Timeout"), true, false}
} }
<-done // Wait for the worker to finish, failing this can cause memory errors (util.[Get||Put]Bytes stuff) // Wait for the worker to finish, otherwise there are memory errors ([Get||Put]Bytes stuff)
<-done
// Give the packet to the router // Give the packet to the router
sinfo.core.router.out(packet) sinfo.core.router.out(packet)
// Finally return the number of bytes we wrote // Finally return the number of bytes we wrote