2019-04-18 15:38:24 +00:00
package yggdrasil
import (
"errors"
2019-04-21 11:28:46 +00:00
"fmt"
2019-04-20 10:53:38 +00:00
"sync"
2019-04-22 10:20:35 +00:00
"sync/atomic"
2019-04-18 15:38:24 +00:00
"time"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
2019-04-18 22:38:23 +00:00
"github.com/yggdrasil-network/yggdrasil-go/src/util"
2019-04-18 15:38:24 +00:00
)
2019-05-29 19:16:17 +00:00
// ConnError implements the net.Error interface
2019-04-27 00:31:47 +00:00
type ConnError struct {
error
timeout bool
temporary bool
2019-05-29 19:16:17 +00:00
maxsize int
2019-04-27 00:31:47 +00:00
}
2019-05-29 19:16:17 +00:00
// Timeout returns true if the error relates to a timeout condition on the
// connection.
2019-04-27 00:31:47 +00:00
func ( e * ConnError ) Timeout ( ) bool {
return e . timeout
}
2019-05-29 19:16:17 +00:00
// Temporary return true if the error is temporary or false if it is a permanent
// error condition.
2019-04-27 00:31:47 +00:00
func ( e * ConnError ) Temporary ( ) bool {
return e . temporary
}
2019-05-29 19:16:17 +00:00
// PacketTooBig returns in response to sending a packet that is too large, and
// if so, the maximum supported packet size that should be used for the
// connection.
func ( e * ConnError ) PacketTooBig ( ) ( bool , int ) {
return e . maxsize > 0 , e . maxsize
}
2019-04-18 22:38:23 +00:00
type Conn struct {
core * Core
nodeID * crypto . NodeID
nodeMask * crypto . NodeID
2019-04-26 23:07:57 +00:00
mutex sync . RWMutex
2019-04-27 00:31:47 +00:00
closed bool
2019-04-22 01:38:14 +00:00
session * sessionInfo
2019-04-27 00:31:47 +00:00
readDeadline atomic . Value // time.Time // TODO timer
writeDeadline atomic . Value // time.Time // TODO timer
searching atomic . Value // bool
searchwait chan struct { } // Never reset this, it's only used for the initial search
2019-05-24 01:27:52 +00:00
writebuf [ ] [ ] byte // Packets to be sent if/when the search finishes
2019-04-26 23:07:57 +00:00
}
2019-04-27 00:31:47 +00:00
// TODO func NewConn() that initializes additional fields as needed
2019-04-26 23:07:57 +00:00
func newConn ( core * Core , nodeID * crypto . NodeID , nodeMask * crypto . NodeID , session * sessionInfo ) * Conn {
conn := Conn {
core : core ,
nodeID : nodeID ,
nodeMask : nodeMask ,
session : session ,
searchwait : make ( chan struct { } ) ,
}
conn . searching . Store ( false )
return & conn
2019-04-18 22:38:23 +00:00
}
2019-04-21 11:28:46 +00:00
func ( c * Conn ) String ( ) string {
2019-04-22 14:00:19 +00:00
return fmt . Sprintf ( "conn=%p" , c )
2019-04-21 11:28:46 +00:00
}
2019-04-18 22:38:23 +00:00
// This method should only be called from the router goroutine
func ( c * Conn ) startSearch ( ) {
2019-04-22 19:06:39 +00:00
// The searchCompleted callback is given to the search
2019-04-18 22:38:23 +00:00
searchCompleted := func ( sinfo * sessionInfo , err error ) {
2019-05-24 01:27:52 +00:00
defer c . searching . Store ( false )
2019-04-22 19:06:39 +00:00
// If the search failed for some reason, e.g. it hit a dead end or timed
// out, then do nothing
2019-04-18 22:38:23 +00:00
if err != nil {
2019-04-22 14:00:19 +00:00
c . core . log . Debugln ( c . String ( ) , "DHT search failed:" , err )
2019-04-18 22:38:23 +00:00
return
}
2019-04-22 19:06:39 +00:00
// Take the connection mutex
c . mutex . Lock ( )
defer c . mutex . Unlock ( )
2019-04-27 00:31:47 +00:00
// Were we successfully given a sessionInfo pointer?
2019-04-18 22:38:23 +00:00
if sinfo != nil {
2019-04-22 19:06:39 +00:00
// Store it, and update the nodeID and nodeMask (which may have been
// wildcarded before now) with their complete counterparts
2019-04-22 14:00:19 +00:00
c . core . log . Debugln ( c . String ( ) , "DHT search completed" )
2019-04-18 22:38:23 +00:00
c . session = sinfo
2019-04-22 19:06:39 +00:00
c . nodeID = crypto . GetNodeID ( & sinfo . theirPermPub )
for i := range c . nodeMask {
c . nodeMask [ i ] = 0xFF
}
2019-04-27 00:31:47 +00:00
// Make sure that any blocks on read/write operations are lifted
defer func ( ) { recover ( ) } ( ) // So duplicate searches don't panic
close ( c . searchwait )
2019-04-22 14:00:19 +00:00
} else {
2019-04-22 19:06:39 +00:00
// No session was returned - this shouldn't really happen because we
// should always return an error reason if we don't return a session
panic ( "DHT search didn't return an error or a sessionInfo" )
2019-04-18 22:38:23 +00:00
}
2019-04-27 00:31:47 +00:00
if c . closed {
// Things were closed before the search returned
// Go ahead and close it again to make sure the session is cleaned up
go c . Close ( )
2019-05-24 01:27:52 +00:00
} else {
// Send any messages we may have buffered
var msgs [ ] [ ] byte
msgs , c . writebuf = c . writebuf , nil
go func ( ) {
for _ , msg := range msgs {
c . Write ( msg )
util . PutBytes ( msg )
}
} ( )
2019-04-27 00:31:47 +00:00
}
2019-04-18 22:38:23 +00:00
}
2019-04-22 19:06:39 +00:00
// doSearch will be called below in response to one or more conditions
2019-04-18 15:38:24 +00:00
doSearch := func ( ) {
2019-04-22 14:00:19 +00:00
c . searching . Store ( true )
2019-04-22 19:06:39 +00:00
// Check to see if there is a search already matching the destination
2019-04-18 22:38:23 +00:00
sinfo , isIn := c . core . searches . searches [ * c . nodeID ]
2019-04-18 15:38:24 +00:00
if ! isIn {
2019-04-22 19:06:39 +00:00
// Nothing was found, so create a new search
2019-04-18 22:38:23 +00:00
sinfo = c . core . searches . newIterSearch ( c . nodeID , c . nodeMask , searchCompleted )
2019-04-22 14:00:19 +00:00
c . core . log . Debugf ( "%s DHT search started: %p" , c . String ( ) , sinfo )
2019-04-18 15:38:24 +00:00
}
2019-04-22 19:06:39 +00:00
// Continue the search
2019-04-18 22:38:23 +00:00
c . core . searches . continueSearch ( sinfo )
2019-04-18 15:38:24 +00:00
}
2019-04-22 19:06:39 +00:00
// Take a copy of the session object, in case it changes later
2019-04-22 01:38:14 +00:00
c . mutex . RLock ( )
2019-04-22 14:00:19 +00:00
sinfo := c . session
c . mutex . RUnlock ( )
2019-04-22 01:38:14 +00:00
if c . session == nil {
2019-04-22 19:06:39 +00:00
// No session object is present so previous searches, if we ran any, have
// not yielded a useful result (dead end, remote host not found)
2019-04-18 15:38:24 +00:00
doSearch ( )
2019-04-22 01:38:14 +00:00
} else {
sinfo . worker <- func ( ) {
switch {
case ! sinfo . init :
doSearch ( )
case time . Since ( sinfo . time ) > 6 * time . Second :
if sinfo . time . Before ( sinfo . pingTime ) && time . Since ( sinfo . pingTime ) > 6 * time . Second {
// TODO double check that the above condition is correct
doSearch ( )
} else {
c . core . sessions . ping ( sinfo )
}
default : // Don't do anything, to keep traffic throttled
2019-04-18 15:38:24 +00:00
}
}
}
}
2019-04-27 00:31:47 +00:00
func getDeadlineTimer ( value * atomic . Value ) * time . Timer {
2019-04-27 03:42:05 +00:00
timer := time . NewTimer ( 24 * 365 * time . Hour ) // FIXME for some reason setting this to 0 doesn't always let it stop and drain the channel correctly
2019-04-27 00:31:47 +00:00
util . TimerStop ( timer )
if deadline , ok := value . Load ( ) . ( time . Time ) ; ok {
timer . Reset ( time . Until ( deadline ) )
}
return timer
}
2019-04-18 15:38:24 +00:00
func ( c * Conn ) Read ( b [ ] byte ) ( int , error ) {
2019-04-22 14:00:19 +00:00
// Take a copy of the session object
2019-04-22 10:20:35 +00:00
c . mutex . RLock ( )
sinfo := c . session
c . mutex . RUnlock ( )
2019-04-27 00:31:47 +00:00
timer := getDeadlineTimer ( & c . readDeadline )
defer util . TimerStop ( timer )
2019-04-22 22:12:13 +00:00
// If there is a search in progress then wait for the result
2019-04-22 22:58:59 +00:00
if sinfo == nil {
// Wait for the search to complete
2019-04-27 00:31:47 +00:00
select {
case <- c . searchwait :
case <- timer . C :
2019-05-29 19:16:17 +00:00
return 0 , ConnError { errors . New ( "timeout" ) , true , false , 0 }
2019-04-27 00:31:47 +00:00
}
2019-04-22 22:58:59 +00:00
// Retrieve our session info again
c . mutex . RLock ( )
sinfo = c . session
c . mutex . RUnlock ( )
// If sinfo is still nil at this point then the search failed and the
// searchwait channel has been recreated, so might as well give up and
// return an error code
if sinfo == nil {
return 0 , errors . New ( "search failed" )
}
2019-04-22 14:00:19 +00:00
}
2019-05-31 22:51:01 +00:00
for {
// Wait for some traffic to come through from the session
select {
case <- timer . C :
return 0 , ConnError { errors . New ( "timeout" ) , true , false , 0 }
case p , ok := <- sinfo . recv :
// If the session is closed then do nothing
if ! ok {
return 0 , errors . New ( "session is closed" )
}
defer util . PutBytes ( p . Payload )
var err error
done := make ( chan struct { } )
workerFunc := func ( ) {
defer close ( done )
// If the nonce is bad then drop the packet and return an error
if ! sinfo . nonceIsOK ( & p . Nonce ) {
err = ConnError { errors . New ( "packet dropped due to invalid nonce" ) , false , true , 0 }
return
}
// Decrypt the packet
bs , isOK := crypto . BoxOpen ( & sinfo . sharedSesKey , p . Payload , & p . Nonce )
defer util . PutBytes ( bs ) // FIXME commenting this out leads to illegal buffer reuse, this implies there's a memory error somewhere and that this is just flooding things out of the finite pool of old slices that get reused
// Check if we were unable to decrypt the packet for some reason and
// return an error if we couldn't
if ! isOK {
err = ConnError { errors . New ( "packet dropped due to decryption failure" ) , false , true , 0 }
return
}
// Return the newly decrypted buffer back to the slice we were given
copy ( b , bs )
// Trim the slice down to size based on the data we received
if len ( bs ) < len ( b ) {
b = b [ : len ( bs ) ]
}
// Update the session
sinfo . updateNonce ( & p . Nonce )
sinfo . time = time . Now ( )
sinfo . bytesRecvd += uint64 ( len ( b ) )
2019-04-19 20:23:15 +00:00
}
2019-05-31 22:51:01 +00:00
// Hand over to the session worker
select { // Send to worker
case sinfo . worker <- workerFunc :
case <- timer . C :
return 0 , ConnError { errors . New ( "timeout" ) , true , false , 0 }
2019-04-19 20:23:15 +00:00
}
2019-05-31 22:51:01 +00:00
<- done // Wait for the worker to finish, failing this can cause memory errors (util.[Get||Put]Bytes stuff)
// Something went wrong in the session worker so abort
if err != nil {
if ce , ok := err . ( * ConnError ) ; ok && ce . Temporary ( ) {
continue
}
return 0 , err
2019-04-19 22:47:11 +00:00
}
2019-05-31 22:51:01 +00:00
// If we've reached this point then everything went to plan, return the
// number of bytes we populated back into the given slice
return len ( b ) , nil
2019-04-19 09:55:15 +00:00
}
2019-04-18 22:38:23 +00:00
}
2019-04-18 15:38:24 +00:00
}
2019-04-19 09:55:15 +00:00
func ( c * Conn ) Write ( b [ ] byte ) ( bytesWritten int , err error ) {
2019-04-22 10:20:35 +00:00
c . mutex . RLock ( )
sinfo := c . session
c . mutex . RUnlock ( )
2019-04-22 14:00:19 +00:00
// If the session doesn't exist, or isn't initialised (which probably means
2019-04-22 22:58:59 +00:00
// that the search didn't complete successfully) then we may need to wait for
// the search to complete or start the search again
2019-04-22 19:06:39 +00:00
if sinfo == nil || ! sinfo . init {
2019-04-22 14:00:19 +00:00
// Is a search already taking place?
if searching , sok := c . searching . Load ( ) . ( bool ) ; ! sok || ( sok && ! searching ) {
// No search was already taking place so start a new one
2019-04-27 02:49:11 +00:00
c . core . router . doAdmin ( c . startSearch )
2019-04-22 21:38:37 +00:00
}
2019-05-24 01:27:52 +00:00
// Buffer the packet to be sent if/when the search is finished
c . mutex . Lock ( )
defer c . mutex . Unlock ( )
c . writebuf = append ( c . writebuf , append ( util . GetBytes ( ) , b ... ) )
for len ( c . writebuf ) > 32 {
util . PutBytes ( c . writebuf [ 0 ] )
c . writebuf = c . writebuf [ 1 : ]
2019-04-22 14:00:19 +00:00
}
2019-05-24 01:27:52 +00:00
return len ( b ) , nil
2019-06-11 03:09:12 +00:00
} else {
// This triggers some session keepalive traffic
// FIXME this desparately needs to be refactored, since the ping case needlessly goes through the router goroutine just to have it pass a function to the session worker when it determines that a session already exists.
c . core . router . doAdmin ( c . startSearch )
2019-04-18 22:38:23 +00:00
}
2019-04-22 01:38:14 +00:00
var packet [ ] byte
2019-04-27 00:31:47 +00:00
done := make ( chan struct { } )
2019-05-29 19:16:17 +00:00
written := len ( b )
2019-04-27 00:31:47 +00:00
workerFunc := func ( ) {
defer close ( done )
2019-05-29 19:16:17 +00:00
// Does the packet exceed the permitted size for the session?
if uint16 ( len ( b ) ) > sinfo . getMTU ( ) {
written , err = 0 , ConnError { errors . New ( "packet too big" ) , true , false , int ( sinfo . getMTU ( ) ) }
return
}
2019-04-22 14:00:19 +00:00
// Encrypt the packet
2019-04-22 01:38:14 +00:00
payload , nonce := crypto . BoxSeal ( & sinfo . sharedSesKey , b , & sinfo . myNonce )
defer util . PutBytes ( payload )
2019-04-22 14:00:19 +00:00
// Construct the wire packet to send to the router
2019-04-22 01:38:14 +00:00
p := wire_trafficPacket {
Coords : sinfo . coords ,
Handle : sinfo . theirHandle ,
Nonce : * nonce ,
Payload : payload ,
}
packet = p . encode ( )
sinfo . bytesSent += uint64 ( len ( b ) )
2019-04-27 00:31:47 +00:00
}
2019-05-24 01:27:52 +00:00
// Set up a timer so this doesn't block forever
timer := getDeadlineTimer ( & c . writeDeadline )
defer util . TimerStop ( timer )
2019-04-27 00:31:47 +00:00
// Hand over to the session worker
select { // Send to worker
case sinfo . worker <- workerFunc :
case <- timer . C :
2019-05-29 19:16:17 +00:00
return 0 , ConnError { errors . New ( "timeout" ) , true , false , 0 }
2019-04-27 00:31:47 +00:00
}
2019-05-24 01:27:52 +00:00
// Wait for the worker to finish, otherwise there are memory errors ([Get||Put]Bytes stuff)
<- done
2019-04-22 14:00:19 +00:00
// Give the packet to the router
2019-05-29 19:16:17 +00:00
if written > 0 {
sinfo . core . router . out ( packet )
}
2019-04-22 14:00:19 +00:00
// Finally return the number of bytes we wrote
2019-05-29 19:16:17 +00:00
return written , err
2019-04-18 15:38:24 +00:00
}
func ( c * Conn ) Close ( ) error {
2019-04-27 00:31:47 +00:00
c . mutex . Lock ( )
defer c . mutex . Unlock ( )
if c . session != nil {
// Close the session, if it hasn't been closed already
c . session . close ( )
c . session = nil
}
2019-04-22 14:00:19 +00:00
// This can't fail yet - TODO?
2019-04-27 00:31:47 +00:00
c . closed = true
2019-04-18 15:38:24 +00:00
return nil
}
func ( c * Conn ) LocalAddr ( ) crypto . NodeID {
return * crypto . GetNodeID ( & c . session . core . boxPub )
}
func ( c * Conn ) RemoteAddr ( ) crypto . NodeID {
2019-04-20 15:32:27 +00:00
c . mutex . RLock ( )
defer c . mutex . RUnlock ( )
return * c . nodeID
2019-04-18 15:38:24 +00:00
}
func ( c * Conn ) SetDeadline ( t time . Time ) error {
c . SetReadDeadline ( t )
c . SetWriteDeadline ( t )
return nil
}
func ( c * Conn ) SetReadDeadline ( t time . Time ) error {
2019-04-22 10:20:35 +00:00
c . readDeadline . Store ( t )
2019-04-18 15:38:24 +00:00
return nil
}
func ( c * Conn ) SetWriteDeadline ( t time . Time ) error {
2019-04-22 10:20:35 +00:00
c . writeDeadline . Store ( t )
2019-04-18 15:38:24 +00:00
return nil
}