5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-12-23 18:05:39 +00:00

Merge pull request #444 from Arceliar/dial

Dial fixes
This commit is contained in:
Arceliar 2019-06-29 16:17:47 -05:00 committed by GitHub
commit 43bcb9e154
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 33 additions and 12 deletions

View File

@ -89,12 +89,14 @@ func (c *Conn) search() error {
<-done <-done
c.session = sess c.session = sess
if c.session == nil && err == nil { if c.session == nil && err == nil {
panic("search failed but returend no error") panic("search failed but returned no error")
} }
if c.session != nil {
c.nodeID = crypto.GetNodeID(&c.session.theirPermPub) c.nodeID = crypto.GetNodeID(&c.session.theirPermPub)
for i := range c.nodeMask { for i := range c.nodeMask {
c.nodeMask[i] = 0xFF c.nodeMask[i] = 0xFF
} }
}
return err return err
} else { } else {
return errors.New("search already exists") return errors.New("search already exists")
@ -218,8 +220,6 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
go func() { c.core.router.admin <- routerWork }() go func() { c.core.router.admin <- routerWork }()
} }
switch { switch {
case !sinfo.init:
sinfo.core.sessions.ping(sinfo)
case time.Since(sinfo.time) > 6*time.Second: case time.Since(sinfo.time) > 6*time.Second:
if sinfo.time.Before(sinfo.pingTime) && time.Since(sinfo.pingTime) > 6*time.Second { if sinfo.time.Before(sinfo.pingTime) && time.Since(sinfo.pingTime) > 6*time.Second {
// TODO double check that the above condition is correct // TODO double check that the above condition is correct
@ -227,6 +227,8 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) {
} else { } else {
sinfo.core.sessions.ping(sinfo) sinfo.core.sessions.ping(sinfo)
} }
case sinfo.reset && sinfo.pingTime.Before(sinfo.time):
sinfo.core.sessions.ping(sinfo)
default: // Don't do anything, to keep traffic throttled default: // Don't do anything, to keep traffic throttled
} }
} }

View File

@ -5,6 +5,7 @@ import (
"errors" "errors"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
) )
@ -61,7 +62,16 @@ func (d *Dialer) Dial(network, address string) (*Conn, error) {
func (d *Dialer) DialByNodeIDandMask(nodeID, nodeMask *crypto.NodeID) (*Conn, error) { func (d *Dialer) DialByNodeIDandMask(nodeID, nodeMask *crypto.NodeID) (*Conn, error) {
conn := newConn(d.core, nodeID, nodeMask, nil) conn := newConn(d.core, nodeID, nodeMask, nil)
if err := conn.search(); err != nil { if err := conn.search(); err != nil {
conn.Close()
return nil, err return nil, err
} }
t := time.NewTimer(6 * time.Second) // TODO use a context instead
defer t.Stop()
select {
case <-conn.session.init:
return conn, nil return conn, nil
case <-t.C:
conn.Close()
return nil, errors.New("session handshake timeout")
}
} }

View File

@ -119,7 +119,7 @@ func (r *router) mainLoop() {
case info := <-r.core.dht.peers: case info := <-r.core.dht.peers:
r.core.dht.insertPeer(info) r.core.dht.insertPeer(info)
case <-r.reset: case <-r.reset:
r.core.sessions.resetInits() r.core.sessions.reset()
r.core.dht.reset() r.core.dht.reset()
case <-ticker.C: case <-ticker.C:
{ {

View File

@ -39,12 +39,13 @@ type sessionInfo struct {
pingTime time.Time // time the first ping was sent since the last received packet pingTime time.Time // time the first ping was sent since the last received packet
pingSend time.Time // time the last ping was sent pingSend time.Time // time the last ping was sent
coords []byte // coords of destination coords []byte // coords of destination
init bool // Reset if coords change reset bool // reset if coords change
tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation
bytesSent uint64 // Bytes of real traffic sent in this session bytesSent uint64 // Bytes of real traffic sent in this session
bytesRecvd uint64 // Bytes of real traffic received in this session bytesRecvd uint64 // Bytes of real traffic received in this session
worker chan func() // Channel to send work to the session worker worker chan func() // Channel to send work to the session worker
recv chan *wire_trafficPacket // Received packets go here, picked up by the associated Conn recv chan *wire_trafficPacket // Received packets go here, picked up by the associated Conn
init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use
} }
func (sinfo *sessionInfo) doWorker(f func()) { func (sinfo *sessionInfo) doWorker(f func()) {
@ -101,7 +102,14 @@ func (s *sessionInfo) update(p *sessionPing) bool {
} }
s.time = time.Now() s.time = time.Now()
s.tstamp = p.Tstamp s.tstamp = p.Tstamp
s.init = true s.reset = false
defer func() { recover() }() // Recover if the below panics
select {
case <-s.init:
default:
// Unblock anything waiting for the session to initialize
close(s.init)
}
return true return true
} }
@ -203,6 +211,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
sinfo.mtuTime = now sinfo.mtuTime = now
sinfo.pingTime = now sinfo.pingTime = now
sinfo.pingSend = now sinfo.pingSend = now
sinfo.init = make(chan struct{})
higher := false higher := false
for idx := range ss.core.boxPub { for idx := range ss.core.boxPub {
if ss.core.boxPub[idx] > sinfo.theirPermPub[idx] { if ss.core.boxPub[idx] > sinfo.theirPermPub[idx] {
@ -410,10 +419,10 @@ func (sinfo *sessionInfo) updateNonce(theirNonce *crypto.BoxNonce) {
// Resets all sessions to an uninitialized state. // Resets all sessions to an uninitialized state.
// Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change. // Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change.
func (ss *sessions) resetInits() { func (ss *sessions) reset() {
for _, sinfo := range ss.sinfos { for _, sinfo := range ss.sinfos {
sinfo.doWorker(func() { sinfo.doWorker(func() {
sinfo.init = false sinfo.reset = true
}) })
} }
} }