5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-29 17:51:35 +00:00

Merge pull request #475 from Arceliar/misc

Misc
This commit is contained in:
Neil Alexander 2019-07-29 20:24:49 +01:00 committed by GitHub
commit 853054eb62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 83 additions and 136 deletions

5
go.sum
View File

@ -18,25 +18,30 @@ github.com/yggdrasil-network/water v0.0.0-20190719211521-a76871ea954b/go.mod h1:
github.com/yggdrasil-network/water v0.0.0-20190719213007-b160316e362e/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs= github.com/yggdrasil-network/water v0.0.0-20190719213007-b160316e362e/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs=
github.com/yggdrasil-network/water v0.0.0-20190720101301-5db94379a5eb/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs= github.com/yggdrasil-network/water v0.0.0-20190720101301-5db94379a5eb/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs=
github.com/yggdrasil-network/water v0.0.0-20190720145626-28ccb9101d55/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs= github.com/yggdrasil-network/water v0.0.0-20190720145626-28ccb9101d55/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs=
github.com/yggdrasil-network/water v0.0.0-20190725073841-250edb919f8a h1:mQ0mPD+dyB/vaDPyVkCBiXUQu9Or7/cRSTjPlV8tXvw=
github.com/yggdrasil-network/water v0.0.0-20190725073841-250edb919f8a/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs= github.com/yggdrasil-network/water v0.0.0-20190725073841-250edb919f8a/go.mod h1:R0SBCsugm+Sf1katgTb2t7GXMm+nRIv43tM4VDZbaOs=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 h1:mKdxBk7AujPs8kU4m80U72y/zjbZ3UcXC7dClwKbUI0= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 h1:mKdxBk7AujPs8kU4m80U72y/zjbZ3UcXC7dClwKbUI0=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20181207154023-610586996380 h1:zPQexyRtNYBc7bcHmehl1dH6TB3qn8zytv8cBGLDNY0= golang.org/x/net v0.0.0-20181207154023-610586996380 h1:zPQexyRtNYBc7bcHmehl1dH6TB3qn8zytv8cBGLDNY0=
golang.org/x/net v0.0.0-20181207154023-610586996380/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181207154023-610586996380/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 h1:Ao/3l156eZf2AW5wK8a7/smtodRU+gha3+BeqJ69lRk=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20181206074257-70b957f3b65e h1:njOxP/wVblhCLIUhjHXf6X+dzTt5OQ3vMQo9mkOIKIo= golang.org/x/sys v0.0.0-20181206074257-70b957f3b65e h1:njOxP/wVblhCLIUhjHXf6X+dzTt5OQ3vMQo9mkOIKIo=
golang.org/x/sys v0.0.0-20181206074257-70b957f3b65e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181206074257-70b957f3b65e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 h1:LepdCS8Gf/MVejFIt8lsiexZATdoGVyp5bcyS+rYoUI=
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190719005602-e377ae9d6386/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI= golang.org/x/tools v0.0.0-20190719005602-e377ae9d6386/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI=

View File

@ -110,20 +110,10 @@ func (tun *TunAdapter) writer() error {
} }
} }
func (tun *TunAdapter) reader() error { // Run in a separate goroutine by the reader
recvd := make([]byte, 65535+tun_ETHER_HEADER_LENGTH) // Does all of the per-packet ICMP checks, passes packets to the right Conn worker
for { func (tun *TunAdapter) readerPacketHandler(ch chan []byte) {
// Wait for a packet to be delivered to us through the TUN/TAP adapter for recvd := range ch {
n, err := tun.iface.Read(recvd)
if err != nil {
if !tun.isOpen {
return err
}
panic(err)
}
if n == 0 {
continue
}
// If it's a TAP adapter, update the buffer slice so that we no longer // If it's a TAP adapter, update the buffer slice so that we no longer
// include the ethernet headers // include the ethernet headers
offset := 0 offset := 0
@ -137,8 +127,7 @@ func (tun *TunAdapter) reader() error {
} }
// Offset the buffer from now on so that we can ignore ethernet frames if // Offset the buffer from now on so that we can ignore ethernet frames if
// they are present // they are present
bs := recvd[offset : offset+n] bs := recvd[offset:]
n -= offset
// If we detect an ICMP packet then hand it to the ICMPv6 module // If we detect an ICMP packet then hand it to the ICMPv6 module
if bs[6] == 58 { if bs[6] == 58 {
// Found an ICMPv6 packet - we need to make sure to give ICMPv6 the full // Found an ICMPv6 packet - we need to make sure to give ICMPv6 the full
@ -150,6 +139,8 @@ func (tun *TunAdapter) reader() error {
continue continue
} }
} }
// Shift forward to avoid leaking bytes off the front of the slide when we eventually store it
bs = append(recvd[:0], bs...)
// From the IP header, work out what our source and destination addresses // From the IP header, work out what our source and destination addresses
// and node IDs are. We will need these in order to work out where to send // and node IDs are. We will need these in order to work out where to send
// the packet // the packet
@ -159,6 +150,7 @@ func (tun *TunAdapter) reader() error {
var dstNodeIDMask *crypto.NodeID var dstNodeIDMask *crypto.NodeID
var dstSnet address.Subnet var dstSnet address.Subnet
var addrlen int var addrlen int
n := len(bs)
// Check the IP protocol - if it doesn't match then we drop the packet and // Check the IP protocol - if it doesn't match then we drop the packet and
// do nothing with it // do nothing with it
if bs[0]&0xf0 == 0x60 { if bs[0]&0xf0 == 0x60 {
@ -240,12 +232,11 @@ func (tun *TunAdapter) reader() error {
panic("Given empty dstNodeID and dstNodeIDMask - this shouldn't happen") panic("Given empty dstNodeID and dstNodeIDMask - this shouldn't happen")
} }
// Dial to the remote node // Dial to the remote node
packet := append(util.GetBytes(), bs[:n]...)
go func() { go func() {
// FIXME just spitting out a goroutine to do this is kind of ugly and means we drop packets until the dial finishes // FIXME just spitting out a goroutine to do this is kind of ugly and means we drop packets until the dial finishes
tun.mutex.Lock() tun.mutex.Lock()
_, known := tun.dials[*dstNodeID] _, known := tun.dials[*dstNodeID]
tun.dials[*dstNodeID] = append(tun.dials[*dstNodeID], packet) tun.dials[*dstNodeID] = append(tun.dials[*dstNodeID], bs)
for len(tun.dials[*dstNodeID]) > 32 { for len(tun.dials[*dstNodeID]) > 32 {
util.PutBytes(tun.dials[*dstNodeID][0]) util.PutBytes(tun.dials[*dstNodeID][0])
tun.dials[*dstNodeID] = tun.dials[*dstNodeID][1:] tun.dials[*dstNodeID] = tun.dials[*dstNodeID][1:]
@ -283,12 +274,33 @@ func (tun *TunAdapter) reader() error {
} }
// If we have a connection now, try writing to it // If we have a connection now, try writing to it
if isIn && session != nil { if isIn && session != nil {
packet := append(util.GetBytes(), bs[:n]...)
select { select {
case session.send <- packet: case session.send <- bs:
default: default:
util.PutBytes(packet) util.PutBytes(bs)
} }
} }
} }
} }
func (tun *TunAdapter) reader() error {
recvd := make([]byte, 65535+tun_ETHER_HEADER_LENGTH)
toWorker := make(chan []byte, 32)
defer close(toWorker)
go tun.readerPacketHandler(toWorker)
for {
// Wait for a packet to be delivered to us through the TUN/TAP adapter
n, err := tun.iface.Read(recvd)
if err != nil {
if !tun.isOpen {
return err
}
panic(err)
}
if n == 0 {
continue
}
bs := append(util.GetBytes(), recvd[:n]...)
toWorker <- bs
}
}

View File

@ -13,33 +13,25 @@ type Cancellation interface {
Error() error Error() error
} }
var CancellationFinalized = errors.New("finalizer called")
var CancellationTimeoutError = errors.New("timeout")
func CancellationFinalizer(c Cancellation) { func CancellationFinalizer(c Cancellation) {
c.Cancel(errors.New("finalizer called")) c.Cancel(CancellationFinalized)
} }
type cancellation struct { type cancellation struct {
signal chan error
cancel chan struct{} cancel chan struct{}
errMtx sync.RWMutex mutex sync.RWMutex
err error err error
} done bool
func (c *cancellation) worker() {
// Launch this in a separate goroutine when creating a cancellation
err := <-c.signal
c.errMtx.Lock()
c.err = err
c.errMtx.Unlock()
close(c.cancel)
} }
func NewCancellation() Cancellation { func NewCancellation() Cancellation {
c := cancellation{ c := cancellation{
signal: make(chan error),
cancel: make(chan struct{}), cancel: make(chan struct{}),
} }
runtime.SetFinalizer(&c, CancellationFinalizer) runtime.SetFinalizer(&c, CancellationFinalizer)
go c.worker()
return &c return &c
} }
@ -48,18 +40,22 @@ func (c *cancellation) Finished() <-chan struct{} {
} }
func (c *cancellation) Cancel(err error) error { func (c *cancellation) Cancel(err error) error {
select { c.mutex.Lock()
case c.signal <- err: defer c.mutex.Unlock()
if c.done {
return c.err
} else {
c.err = err
c.done = true
close(c.cancel)
return nil return nil
case <-c.cancel:
return c.Error()
} }
} }
func (c *cancellation) Error() error { func (c *cancellation) Error() error {
c.errMtx.RLock() c.mutex.RLock()
err := c.err err := c.err
c.errMtx.RUnlock() c.mutex.RUnlock()
return err return err
} }
@ -75,8 +71,6 @@ func CancellationChild(parent Cancellation) Cancellation {
return child return child
} }
var CancellationTimeoutError = errors.New("timeout")
func CancellationWithTimeout(parent Cancellation, timeout time.Duration) Cancellation { func CancellationWithTimeout(parent Cancellation, timeout time.Duration) Cancellation {
child := CancellationChild(parent) child := CancellationChild(parent)
go func() { go func() {

View File

@ -3,6 +3,7 @@ package util
// These are misc. utility functions that didn't really fit anywhere else // These are misc. utility functions that didn't really fit anywhere else
import "runtime" import "runtime"
import "sync"
import "time" import "time"
// A wrapper around runtime.Gosched() so it doesn't need to be imported elsewhere. // A wrapper around runtime.Gosched() so it doesn't need to be imported elsewhere.
@ -21,29 +22,27 @@ func UnlockThread() {
} }
// This is used to buffer recently used slices of bytes, to prevent allocations in the hot loops. // This is used to buffer recently used slices of bytes, to prevent allocations in the hot loops.
// It's used like a sync.Pool, but with a fixed size and typechecked without type casts to/from interface{} (which were making the profiles look ugly). var byteStoreMutex sync.Mutex
var byteStore chan []byte var byteStore [][]byte
func init() { // Gets an empty slice from the byte store.
byteStore = make(chan []byte, 32)
}
// Gets an empty slice from the byte store, if one is available, or else returns a new nil slice.
func GetBytes() []byte { func GetBytes() []byte {
select { byteStoreMutex.Lock()
case bs := <-byteStore: defer byteStoreMutex.Unlock()
return bs[:0] if len(byteStore) > 0 {
default: var bs []byte
bs, byteStore = byteStore[len(byteStore)-1][:0], byteStore[:len(byteStore)-1]
return bs
} else {
return nil return nil
} }
} }
// Puts a slice in the store, if there's room, or else returns and lets the slice get collected. // Puts a slice in the store.
func PutBytes(bs []byte) { func PutBytes(bs []byte) {
select { byteStoreMutex.Lock()
case byteStore <- bs: defer byteStoreMutex.Unlock()
default: byteStore = append(byteStore, bs)
}
} }
// This is a workaround to go's broken timer implementation // This is a workaround to go's broken timer implementation

View File

@ -230,7 +230,7 @@ func (c *Core) GetSessions() []Session {
skip = true skip = true
} }
}() }()
sinfo.doWorker(workerFunc) sinfo.doFunc(workerFunc)
}() }()
if skip { if skip {
continue continue

View File

@ -152,11 +152,9 @@ func (c *Conn) Read(b []byte) (int, error) {
if !ok { if !ok {
return 0, ConnError{errors.New("session closed"), false, false, true, 0} return 0, ConnError{errors.New("session closed"), false, false, true, 0}
} }
defer util.PutBytes(p.Payload)
var err error var err error
done := make(chan struct{}) sessionFunc := func() {
workerFunc := func() { defer util.PutBytes(p.Payload)
defer close(done)
// If the nonce is bad then drop the packet and return an error // If the nonce is bad then drop the packet and return an error
if !sinfo.nonceIsOK(&p.Nonce) { if !sinfo.nonceIsOK(&p.Nonce) {
err = ConnError{errors.New("packet dropped due to invalid nonce"), false, true, false, 0} err = ConnError{errors.New("packet dropped due to invalid nonce"), false, true, false, 0}
@ -176,33 +174,7 @@ func (c *Conn) Read(b []byte) (int, error) {
sinfo.time = time.Now() sinfo.time = time.Now()
sinfo.bytesRecvd += uint64(len(bs)) sinfo.bytesRecvd += uint64(len(bs))
} }
// Hand over to the session worker sinfo.doFunc(sessionFunc)
defer func() {
if recover() != nil {
err = ConnError{errors.New("read failed, session already closed"), false, false, true, 0}
close(done)
}
}() // In case we're racing with a close
// Send to worker
select {
case sinfo.worker <- workerFunc:
case <-cancel.Finished():
if cancel.Error() == util.CancellationTimeoutError {
return 0, ConnError{errors.New("read timeout"), true, false, false, 0}
} else {
return 0, ConnError{errors.New("session closed"), false, false, true, 0}
}
}
// Wait for the worker to finish
select {
case <-done: // Wait for the worker to finish, failing this can cause memory errors (util.[Get||Put]Bytes stuff)
case <-cancel.Finished():
if cancel.Error() == util.CancellationTimeoutError {
return 0, ConnError{errors.New("read timeout"), true, false, false, 0}
} else {
return 0, ConnError{errors.New("session closed"), false, false, true, 0}
}
}
// Something went wrong in the session worker so abort // Something went wrong in the session worker so abort
if err != nil { if err != nil {
if ce, ok := err.(*ConnError); ok && ce.Temporary() { if ce, ok := err.(*ConnError); ok && ce.Temporary() {
@ -223,10 +195,8 @@ func (c *Conn) Read(b []byte) (int, error) {
func (c *Conn) Write(b []byte) (bytesWritten int, err error) { func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
sinfo := c.session sinfo := c.session
var packet []byte var packet []byte
done := make(chan struct{})
written := len(b) written := len(b)
workerFunc := func() { sessionFunc := func() {
defer close(done)
// Does the packet exceed the permitted size for the session? // Does the packet exceed the permitted size for the session?
if uint16(len(b)) > sinfo.getMTU() { if uint16(len(b)) > sinfo.getMTU() {
written, err = 0, ConnError{errors.New("packet too big"), true, false, false, int(sinfo.getMTU())} written, err = 0, ConnError{errors.New("packet too big"), true, false, false, int(sinfo.getMTU())}
@ -273,27 +243,7 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
default: // Don't do anything, to keep traffic throttled default: // Don't do anything, to keep traffic throttled
} }
} }
// Set up a timer so this doesn't block forever sinfo.doFunc(sessionFunc)
cancel := c.getDeadlineCancellation(&c.writeDeadline)
defer cancel.Cancel(nil)
// Hand over to the session worker
defer func() {
if recover() != nil {
err = ConnError{errors.New("write failed, session already closed"), false, false, true, 0}
close(done)
}
}() // In case we're racing with a close
select { // Send to worker
case sinfo.worker <- workerFunc:
case <-cancel.Finished():
if cancel.Error() == util.CancellationTimeoutError {
return 0, ConnError{errors.New("write timeout"), true, false, false, 0}
} else {
return 0, ConnError{errors.New("session closed"), false, false, true, 0}
}
}
// Wait for the worker to finish, otherwise there are memory errors ([Get||Put]Bytes stuff)
<-done
// Give the packet to the router // Give the packet to the router
if written > 0 { if written > 0 {
sinfo.core.router.out(packet) sinfo.core.router.out(packet)

View File

@ -16,6 +16,7 @@ import (
// All the information we know about an active session. // All the information we know about an active session.
// This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API. // This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API.
type sessionInfo struct { type sessionInfo struct {
mutex sync.Mutex // Protects all of the below, use it any time you read/chance the contents of a session
core *Core // core *Core //
reconfigure chan chan error // reconfigure chan chan error //
theirAddr address.Address // theirAddr address.Address //
@ -43,24 +44,14 @@ type sessionInfo struct {
tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation
bytesSent uint64 // Bytes of real traffic sent in this session bytesSent uint64 // Bytes of real traffic sent in this session
bytesRecvd uint64 // Bytes of real traffic received in this session bytesRecvd uint64 // Bytes of real traffic received in this session
worker chan func() // Channel to send work to the session worker
recv chan *wire_trafficPacket // Received packets go here, picked up by the associated Conn recv chan *wire_trafficPacket // Received packets go here, picked up by the associated Conn
init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use
} }
func (sinfo *sessionInfo) doWorker(f func()) { func (sinfo *sessionInfo) doFunc(f func()) {
done := make(chan struct{}) sinfo.mutex.Lock()
sinfo.worker <- func() { defer sinfo.mutex.Unlock()
f() f()
close(done)
}
<-done
}
func (sinfo *sessionInfo) workerMain() {
for f := range sinfo.worker {
f()
}
} }
// Represents a session ping/pong packet, andincludes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU. // Represents a session ping/pong packet, andincludes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU.
@ -231,11 +222,9 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
sinfo.myHandle = *crypto.NewHandle() sinfo.myHandle = *crypto.NewHandle()
sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
sinfo.worker = make(chan func(), 1)
sinfo.recv = make(chan *wire_trafficPacket, 32) sinfo.recv = make(chan *wire_trafficPacket, 32)
ss.sinfos[sinfo.myHandle] = &sinfo ss.sinfos[sinfo.myHandle] = &sinfo
ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
go sinfo.workerMain()
return &sinfo return &sinfo
} }
@ -267,14 +256,12 @@ func (ss *sessions) cleanup() {
ss.lastCleanup = time.Now() ss.lastCleanup = time.Now()
} }
// Closes a session, removing it from sessions maps and killing the worker goroutine. // Closes a session, removing it from sessions maps.
func (sinfo *sessionInfo) close() { func (sinfo *sessionInfo) close() {
if s := sinfo.core.sessions.sinfos[sinfo.myHandle]; s == sinfo { if s := sinfo.core.sessions.sinfos[sinfo.myHandle]; s == sinfo {
delete(sinfo.core.sessions.sinfos, sinfo.myHandle) delete(sinfo.core.sessions.sinfos, sinfo.myHandle)
delete(sinfo.core.sessions.byTheirPerm, sinfo.theirPermPub) delete(sinfo.core.sessions.byTheirPerm, sinfo.theirPermPub)
} }
defer func() { recover() }()
close(sinfo.worker)
} }
// Returns a session ping appropriate for the given session info. // Returns a session ping appropriate for the given session info.
@ -372,7 +359,7 @@ func (ss *sessions) handlePing(ping *sessionPing) {
} }
ss.listenerMutex.Unlock() ss.listenerMutex.Unlock()
} }
sinfo.doWorker(func() { sinfo.doFunc(func() {
// Update the session // Update the session
if !sinfo.update(ping) { /*panic("Should not happen in testing")*/ if !sinfo.update(ping) { /*panic("Should not happen in testing")*/
return return
@ -426,7 +413,7 @@ func (sinfo *sessionInfo) updateNonce(theirNonce *crypto.BoxNonce) {
// Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change. // Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change.
func (ss *sessions) reset() { func (ss *sessions) reset() {
for _, sinfo := range ss.sinfos { for _, sinfo := range ss.sinfos {
sinfo.doWorker(func() { sinfo.doFunc(func() {
sinfo.reset = true sinfo.reset = true
}) })
} }