5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-26 11:51:37 +00:00

start migrating Conn to be an actor

This commit is contained in:
Arceliar 2019-08-23 23:36:00 -05:00
parent cac3444d9a
commit 6ecbc439f0

View File

@ -3,12 +3,12 @@ package yggdrasil
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
"github.com/Arceliar/phony"
)
// ConnError implements the net.Error interface
@ -54,10 +54,10 @@ func (e *ConnError) Closed() bool {
}
type Conn struct {
phony.Actor
core *Core
readDeadline atomic.Value // time.Time // TODO timer
writeDeadline atomic.Value // time.Time // TODO timer
mutex sync.RWMutex // protects the below
readDeadline *time.Time
writeDeadline *time.Time
nodeID *crypto.NodeID
nodeMask *crypto.NodeID
session *sessionInfo
@ -75,9 +75,9 @@ func newConn(core *Core, nodeID *crypto.NodeID, nodeMask *crypto.NodeID, session
}
func (c *Conn) String() string {
c.mutex.RLock()
defer c.mutex.RUnlock()
return fmt.Sprintf("conn=%p", c)
var s string
<-c.SyncExec(func() { s = fmt.Sprintf("conn=%p", c) })
return s
}
// This should never be called from the router goroutine, used in the dial functions
@ -137,10 +137,10 @@ func (c *Conn) doSearch() {
go c.core.router.doAdmin(routerWork)
}
func (c *Conn) getDeadlineCancellation(value *atomic.Value) (util.Cancellation, bool) {
if deadline, ok := value.Load().(time.Time); ok {
func (c *Conn) _getDeadlineCancellation(t *time.Time) (util.Cancellation, bool) {
if t != nil {
// A deadline is set, so return a Cancellation that uses it
c := util.CancellationWithDeadline(c.session.cancel, deadline)
c := util.CancellationWithDeadline(c.session.cancel, *t)
return c, true
} else {
// No deadline was set, so just return the existinc cancellation and a dummy value
@ -150,7 +150,9 @@ func (c *Conn) getDeadlineCancellation(value *atomic.Value) (util.Cancellation,
// Used internally by Read, the caller is responsible for util.PutBytes when they're done.
func (c *Conn) ReadNoCopy() ([]byte, error) {
cancel, doCancel := c.getDeadlineCancellation(&c.readDeadline)
var cancel util.Cancellation
var doCancel bool
<-c.SyncExec(func() { cancel, doCancel = c._getDeadlineCancellation(c.readDeadline) })
if doCancel {
defer cancel.Cancel(nil)
}
@ -210,7 +212,9 @@ func (c *Conn) WriteNoCopy(msg FlowKeyMessage) error {
}
c.session.doFunc(sessionFunc)
if err == nil {
cancel, doCancel := c.getDeadlineCancellation(&c.writeDeadline)
var cancel util.Cancellation
var doCancel bool
<-c.SyncExec(func() { cancel, doCancel = c._getDeadlineCancellation(c.writeDeadline) })
if doCancel {
defer cancel.Cancel(nil)
}
@ -240,14 +244,14 @@ func (c *Conn) Write(b []byte) (int, error) {
}
func (c *Conn) Close() (err error) {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.session != nil {
// Close the session, if it hasn't been closed already
if e := c.session.cancel.Cancel(errors.New("connection closed")); e != nil {
err = ConnError{errors.New("close failed, session already closed"), false, false, true, 0}
<-c.SyncExec(func() {
if c.session != nil {
// Close the session, if it hasn't been closed already
if e := c.session.cancel.Cancel(errors.New("connection closed")); e != nil {
err = ConnError{errors.New("close failed, session already closed"), false, false, true, 0}
}
}
}
})
return
}
@ -256,9 +260,9 @@ func (c *Conn) LocalAddr() crypto.NodeID {
}
func (c *Conn) RemoteAddr() crypto.NodeID {
c.mutex.RLock()
defer c.mutex.RUnlock()
return *c.nodeID
var n crypto.NodeID
<-c.SyncExec(func() { n = *c.nodeID })
return n
}
func (c *Conn) SetDeadline(t time.Time) error {
@ -268,11 +272,11 @@ func (c *Conn) SetDeadline(t time.Time) error {
}
func (c *Conn) SetReadDeadline(t time.Time) error {
c.readDeadline.Store(t)
<-c.SyncExec(func() { c.readDeadline = &t })
return nil
}
func (c *Conn) SetWriteDeadline(t time.Time) error {
c.writeDeadline.Store(t)
<-c.SyncExec(func() { c.writeDeadline = &t })
return nil
}