5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-26 10:41:40 +00:00

Try to improve handling of timeouts

This commit is contained in:
Neil Alexander 2019-07-17 11:13:53 +01:00
parent 2532cd77e4
commit 747b50bb7c
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944
2 changed files with 21 additions and 13 deletions

View File

@ -52,26 +52,29 @@ func (s *tunConn) reader() error {
default: default:
} }
s.tun.log.Debugln("Starting conn reader for", s) s.tun.log.Debugln("Starting conn reader for", s)
defer s.tun.log.Debugln("Stopping conn reader for", s)
var n int var n int
var err error var err error
read := make(chan bool) read := make(chan bool)
b := make([]byte, 65535) b := make([]byte, 65535)
go func() { go func() {
s.tun.log.Debugln("Starting conn reader helper for", s) s.tun.log.Debugln("Starting conn reader helper for", s)
defer s.tun.log.Debugln("Stopping conn reader helper for", s)
for { for {
s.conn.SetReadDeadline(time.Now().Add(tunConnTimeout)) s.conn.SetReadDeadline(time.Now().Add(tunConnTimeout))
if n, err = s.conn.Read(b); err != nil { if n, err = s.conn.Read(b); err != nil {
s.tun.log.Errorln(s.conn.String(), "TUN/TAP conn read error:", err) s.tun.log.Errorln(s.conn.String(), "TUN/TAP conn read error:", err)
if e, eok := err.(yggdrasil.ConnError); eok { if e, eok := err.(yggdrasil.ConnError); eok {
s.tun.log.Debugln("Conn reader helper", s, "error:", e)
switch { switch {
case e.Temporary(): case e.Temporary():
fallthrough
case e.Timeout():
read <- false read <- false
continue continue
case e.Timeout(): case e.Closed():
s.tun.log.Debugln("Conn reader for helper", s, "timed out")
fallthrough fallthrough
default: default:
s.tun.log.Debugln("Stopping conn reader helper for", s)
s.close() s.close()
return return
} }
@ -94,7 +97,6 @@ func (s *tunConn) reader() error {
} }
s.stillAlive() // TODO? Only stay alive if we read >0 bytes? s.stillAlive() // TODO? Only stay alive if we read >0 bytes?
case <-s.stop: case <-s.stop:
s.tun.log.Debugln("Stopping conn reader for", s)
return nil return nil
} }
} }
@ -109,10 +111,10 @@ func (s *tunConn) writer() error {
default: default:
} }
s.tun.log.Debugln("Starting conn writer for", s) s.tun.log.Debugln("Starting conn writer for", s)
defer s.tun.log.Debugln("Stopping conn writer for", s)
for { for {
select { select {
case <-s.stop: case <-s.stop:
s.tun.log.Debugln("Stopping conn writer for", s)
return nil return nil
case b, ok := <-s.send: case b, ok := <-s.send:
if !ok { if !ok {

View File

@ -16,6 +16,7 @@ type ConnError struct {
error error
timeout bool timeout bool
temporary bool temporary bool
closed bool
maxsize int maxsize int
} }
@ -38,6 +39,11 @@ func (e *ConnError) PacketTooBig() (bool, int) {
return e.maxsize > 0, e.maxsize return e.maxsize > 0, e.maxsize
} }
// Closed returns if the session is already closed and is now unusable.
func (e *ConnError) Closed() bool {
return e.closed
}
type Conn struct { type Conn struct {
core *Core core *Core
nodeID *crypto.NodeID nodeID *crypto.NodeID
@ -122,11 +128,11 @@ func (c *Conn) Read(b []byte) (int, error) {
// Wait for some traffic to come through from the session // Wait for some traffic to come through from the session
select { select {
case <-timer.C: case <-timer.C:
return 0, ConnError{errors.New("timeout"), true, false, 0} return 0, ConnError{errors.New("timeout"), true, false, false, 0}
case p, ok := <-sinfo.recv: case p, ok := <-sinfo.recv:
// If the session is closed then do nothing // If the session is closed then do nothing
if !ok { if !ok {
return 0, errors.New("session is closed") return 0, ConnError{errors.New("session is closed"), false, false, true, 0}
} }
defer util.PutBytes(p.Payload) defer util.PutBytes(p.Payload)
var err error var err error
@ -135,7 +141,7 @@ func (c *Conn) Read(b []byte) (int, error) {
defer close(done) defer close(done)
// If the nonce is bad then drop the packet and return an error // If the nonce is bad then drop the packet and return an error
if !sinfo.nonceIsOK(&p.Nonce) { if !sinfo.nonceIsOK(&p.Nonce) {
err = ConnError{errors.New("packet dropped due to invalid nonce"), false, true, 0} err = ConnError{errors.New("packet dropped due to invalid nonce"), false, true, false, 0}
return return
} }
// Decrypt the packet // Decrypt the packet
@ -144,7 +150,7 @@ func (c *Conn) Read(b []byte) (int, error) {
// Check if we were unable to decrypt the packet for some reason and // Check if we were unable to decrypt the packet for some reason and
// return an error if we couldn't // return an error if we couldn't
if !isOK { if !isOK {
err = ConnError{errors.New("packet dropped due to decryption failure"), false, true, 0} err = ConnError{errors.New("packet dropped due to decryption failure"), false, true, false, 0}
return return
} }
// Return the newly decrypted buffer back to the slice we were given // Return the newly decrypted buffer back to the slice we were given
@ -168,7 +174,7 @@ func (c *Conn) Read(b []byte) (int, error) {
select { // Send to worker select { // Send to worker
case sinfo.worker <- workerFunc: case sinfo.worker <- workerFunc:
case <-timer.C: case <-timer.C:
return 0, ConnError{errors.New("timeout"), true, false, 0} return 0, ConnError{errors.New("timeout"), true, false, false, 0}
} }
<-done // Wait for the worker to finish, failing this can cause memory errors (util.[Get||Put]Bytes stuff) <-done // Wait for the worker to finish, failing this can cause memory errors (util.[Get||Put]Bytes stuff)
// Something went wrong in the session worker so abort // Something went wrong in the session worker so abort
@ -194,7 +200,7 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
defer close(done) defer close(done)
// Does the packet exceed the permitted size for the session? // Does the packet exceed the permitted size for the session?
if uint16(len(b)) > sinfo.getMTU() { if uint16(len(b)) > sinfo.getMTU() {
written, err = 0, ConnError{errors.New("packet too big"), true, false, int(sinfo.getMTU())} written, err = 0, ConnError{errors.New("packet too big"), true, false, false, int(sinfo.getMTU())}
return return
} }
// Encrypt the packet // Encrypt the packet
@ -244,14 +250,14 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
// Hand over to the session worker // Hand over to the session worker
defer func() { defer func() {
if recover() != nil { if recover() != nil {
err = errors.New("write failed, session already closed") err = ConnError{errors.New("write failed, session already closed"), false, false, true, 0}
close(done) close(done)
} }
}() // In case we're racing with a close }() // In case we're racing with a close
select { // Send to worker select { // Send to worker
case sinfo.worker <- workerFunc: case sinfo.worker <- workerFunc:
case <-timer.C: case <-timer.C:
return 0, ConnError{errors.New("timeout"), true, false, 0} return 0, ConnError{errors.New("timeout"), true, false, false, 0}
} }
// Wait for the worker to finish, otherwise there are memory errors ([Get||Put]Bytes stuff) // Wait for the worker to finish, otherwise there are memory errors ([Get||Put]Bytes stuff)
<-done <-done