2019-04-18 15:38:24 +00:00
package yggdrasil
import (
"errors"
2019-04-21 11:28:46 +00:00
"fmt"
2019-04-20 10:53:38 +00:00
"sync"
2019-04-22 10:20:35 +00:00
"sync/atomic"
2019-04-18 15:38:24 +00:00
"time"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
2019-04-18 22:38:23 +00:00
"github.com/yggdrasil-network/yggdrasil-go/src/util"
2019-04-18 15:38:24 +00:00
)
2019-04-27 00:31:47 +00:00
// Error implements the net.Error interface
type ConnError struct {
error
timeout bool
temporary bool
}
func ( e * ConnError ) Timeout ( ) bool {
return e . timeout
}
func ( e * ConnError ) Temporary ( ) bool {
return e . temporary
}
2019-04-18 22:38:23 +00:00
type Conn struct {
core * Core
nodeID * crypto . NodeID
nodeMask * crypto . NodeID
2019-04-26 23:07:57 +00:00
mutex sync . RWMutex
2019-04-27 00:31:47 +00:00
closed bool
2019-04-22 01:38:14 +00:00
session * sessionInfo
2019-04-27 00:31:47 +00:00
readDeadline atomic . Value // time.Time // TODO timer
writeDeadline atomic . Value // time.Time // TODO timer
searching atomic . Value // bool
searchwait chan struct { } // Never reset this, it's only used for the initial search
2019-04-26 23:07:57 +00:00
}
2019-04-27 00:31:47 +00:00
// TODO func NewConn() that initializes additional fields as needed
2019-04-26 23:07:57 +00:00
func newConn ( core * Core , nodeID * crypto . NodeID , nodeMask * crypto . NodeID , session * sessionInfo ) * Conn {
conn := Conn {
core : core ,
nodeID : nodeID ,
nodeMask : nodeMask ,
session : session ,
searchwait : make ( chan struct { } ) ,
}
conn . searching . Store ( false )
return & conn
2019-04-18 22:38:23 +00:00
}
2019-04-21 11:28:46 +00:00
func ( c * Conn ) String ( ) string {
2019-04-22 14:00:19 +00:00
return fmt . Sprintf ( "conn=%p" , c )
2019-04-21 11:28:46 +00:00
}
2019-04-18 22:38:23 +00:00
// This method should only be called from the router goroutine
func ( c * Conn ) startSearch ( ) {
2019-04-22 19:06:39 +00:00
// The searchCompleted callback is given to the search
2019-04-18 22:38:23 +00:00
searchCompleted := func ( sinfo * sessionInfo , err error ) {
2019-04-22 19:06:39 +00:00
// If the search failed for some reason, e.g. it hit a dead end or timed
// out, then do nothing
2019-04-18 22:38:23 +00:00
if err != nil {
2019-04-22 14:00:19 +00:00
c . core . log . Debugln ( c . String ( ) , "DHT search failed:" , err )
2019-04-27 00:31:47 +00:00
go func ( ) {
time . Sleep ( time . Second )
c . mutex . RLock ( )
closed := c . closed
c . mutex . RUnlock ( )
if ! closed {
// Restart the search, or else Write can stay blocked forever
c . core . router . admin <- c . startSearch
}
} ( )
2019-04-18 22:38:23 +00:00
return
}
2019-04-27 02:49:11 +00:00
defer c . searching . Store ( false )
2019-04-22 19:06:39 +00:00
// Take the connection mutex
c . mutex . Lock ( )
defer c . mutex . Unlock ( )
2019-04-27 00:31:47 +00:00
// Were we successfully given a sessionInfo pointer?
2019-04-18 22:38:23 +00:00
if sinfo != nil {
2019-04-22 19:06:39 +00:00
// Store it, and update the nodeID and nodeMask (which may have been
// wildcarded before now) with their complete counterparts
2019-04-22 14:00:19 +00:00
c . core . log . Debugln ( c . String ( ) , "DHT search completed" )
2019-04-18 22:38:23 +00:00
c . session = sinfo
2019-04-22 19:06:39 +00:00
c . nodeID = crypto . GetNodeID ( & sinfo . theirPermPub )
for i := range c . nodeMask {
c . nodeMask [ i ] = 0xFF
}
2019-04-27 00:31:47 +00:00
// Make sure that any blocks on read/write operations are lifted
defer func ( ) { recover ( ) } ( ) // So duplicate searches don't panic
close ( c . searchwait )
2019-04-22 14:00:19 +00:00
} else {
2019-04-22 19:06:39 +00:00
// No session was returned - this shouldn't really happen because we
// should always return an error reason if we don't return a session
panic ( "DHT search didn't return an error or a sessionInfo" )
2019-04-18 22:38:23 +00:00
}
2019-04-27 00:31:47 +00:00
if c . closed {
// Things were closed before the search returned
// Go ahead and close it again to make sure the session is cleaned up
go c . Close ( )
}
2019-04-18 22:38:23 +00:00
}
2019-04-22 19:06:39 +00:00
// doSearch will be called below in response to one or more conditions
2019-04-18 15:38:24 +00:00
doSearch := func ( ) {
2019-04-22 14:00:19 +00:00
c . searching . Store ( true )
2019-04-22 19:06:39 +00:00
// Check to see if there is a search already matching the destination
2019-04-18 22:38:23 +00:00
sinfo , isIn := c . core . searches . searches [ * c . nodeID ]
2019-04-18 15:38:24 +00:00
if ! isIn {
2019-04-22 19:06:39 +00:00
// Nothing was found, so create a new search
2019-04-18 22:38:23 +00:00
sinfo = c . core . searches . newIterSearch ( c . nodeID , c . nodeMask , searchCompleted )
2019-04-22 14:00:19 +00:00
c . core . log . Debugf ( "%s DHT search started: %p" , c . String ( ) , sinfo )
2019-04-18 15:38:24 +00:00
}
2019-04-22 19:06:39 +00:00
// Continue the search
2019-04-18 22:38:23 +00:00
c . core . searches . continueSearch ( sinfo )
2019-04-18 15:38:24 +00:00
}
2019-04-22 19:06:39 +00:00
// Take a copy of the session object, in case it changes later
2019-04-22 01:38:14 +00:00
c . mutex . RLock ( )
2019-04-22 14:00:19 +00:00
sinfo := c . session
c . mutex . RUnlock ( )
2019-04-22 01:38:14 +00:00
if c . session == nil {
2019-04-22 19:06:39 +00:00
// No session object is present so previous searches, if we ran any, have
// not yielded a useful result (dead end, remote host not found)
2019-04-18 15:38:24 +00:00
doSearch ( )
2019-04-22 01:38:14 +00:00
} else {
sinfo . worker <- func ( ) {
switch {
case ! sinfo . init :
doSearch ( )
case time . Since ( sinfo . time ) > 6 * time . Second :
if sinfo . time . Before ( sinfo . pingTime ) && time . Since ( sinfo . pingTime ) > 6 * time . Second {
// TODO double check that the above condition is correct
doSearch ( )
} else {
c . core . sessions . ping ( sinfo )
}
default : // Don't do anything, to keep traffic throttled
2019-04-18 15:38:24 +00:00
}
}
}
}
2019-04-27 00:31:47 +00:00
func getDeadlineTimer ( value * atomic . Value ) * time . Timer {
timer := time . NewTimer ( 0 )
util . TimerStop ( timer )
if deadline , ok := value . Load ( ) . ( time . Time ) ; ok {
timer . Reset ( time . Until ( deadline ) )
}
return timer
}
2019-04-18 15:38:24 +00:00
func ( c * Conn ) Read ( b [ ] byte ) ( int , error ) {
2019-04-22 14:00:19 +00:00
// Take a copy of the session object
2019-04-22 10:20:35 +00:00
c . mutex . RLock ( )
sinfo := c . session
c . mutex . RUnlock ( )
2019-04-27 00:31:47 +00:00
timer := getDeadlineTimer ( & c . readDeadline )
defer util . TimerStop ( timer )
2019-04-22 22:12:13 +00:00
// If there is a search in progress then wait for the result
2019-04-22 22:58:59 +00:00
if sinfo == nil {
// Wait for the search to complete
2019-04-27 00:31:47 +00:00
select {
case <- c . searchwait :
case <- timer . C :
return 0 , ConnError { errors . New ( "Timeout" ) , true , false }
}
2019-04-22 22:58:59 +00:00
// Retrieve our session info again
c . mutex . RLock ( )
sinfo = c . session
c . mutex . RUnlock ( )
// If sinfo is still nil at this point then the search failed and the
// searchwait channel has been recreated, so might as well give up and
// return an error code
if sinfo == nil {
return 0 , errors . New ( "search failed" )
}
2019-04-22 14:00:19 +00:00
}
// Wait for some traffic to come through from the session
2019-04-19 09:55:15 +00:00
select {
2019-04-27 02:49:11 +00:00
case <- timer . C :
return 0 , ConnError { errors . New ( "Timeout" ) , true , false }
2019-04-22 22:58:59 +00:00
case p , ok := <- sinfo . recv :
2019-04-22 19:06:39 +00:00
// If the session is closed then do nothing
2019-04-19 09:55:15 +00:00
if ! ok {
2019-04-19 22:30:43 +00:00
return 0 , errors . New ( "session is closed" )
2019-04-19 09:55:15 +00:00
}
defer util . PutBytes ( p . Payload )
2019-04-22 01:38:14 +00:00
var err error
2019-04-27 00:31:47 +00:00
done := make ( chan struct { } )
workerFunc := func ( ) {
defer close ( done )
2019-04-22 14:00:19 +00:00
// If the nonce is bad then drop the packet and return an error
2019-04-22 01:38:14 +00:00
if ! sinfo . nonceIsOK ( & p . Nonce ) {
err = errors . New ( "packet dropped due to invalid nonce" )
return
2019-04-19 20:23:15 +00:00
}
2019-04-22 14:00:19 +00:00
// Decrypt the packet
2019-04-22 01:38:14 +00:00
bs , isOK := crypto . BoxOpen ( & sinfo . sharedSesKey , p . Payload , & p . Nonce )
2019-04-27 03:21:31 +00:00
defer util . PutBytes ( bs ) // FIXME commenting this out leads to illegal buffer reuse, this implies there's a memory error somewhere and that this is just flooding things out of the finite pool of old slices that get reused
2019-04-22 14:00:19 +00:00
// Check if we were unable to decrypt the packet for some reason and
// return an error if we couldn't
2019-04-19 20:23:15 +00:00
if ! isOK {
2019-04-22 01:38:14 +00:00
err = errors . New ( "packet dropped due to decryption failure" )
return
2019-04-19 20:23:15 +00:00
}
2019-04-22 14:00:19 +00:00
// Return the newly decrypted buffer back to the slice we were given
2019-04-19 22:47:11 +00:00
copy ( b , bs )
2019-04-22 14:00:19 +00:00
// Trim the slice down to size based on the data we received
2019-04-19 22:47:11 +00:00
if len ( bs ) < len ( b ) {
b = b [ : len ( bs ) ]
}
2019-04-22 14:00:19 +00:00
// Update the session
2019-04-22 01:38:14 +00:00
sinfo . updateNonce ( & p . Nonce )
sinfo . time = time . Now ( )
sinfo . bytesRecvd += uint64 ( len ( b ) )
2019-04-27 00:31:47 +00:00
}
// Hand over to the session worker
select { // Send to worker
case sinfo . worker <- workerFunc :
case <- timer . C :
return 0 , ConnError { errors . New ( "Timeout" ) , true , false }
}
2019-04-27 03:21:31 +00:00
<- done // Wait for the worker to finish, failing this can cause memory errors (util.[Get||Put]Bytes stuff)
2019-04-22 14:00:19 +00:00
// Something went wrong in the session worker so abort
2019-04-19 20:23:15 +00:00
if err != nil {
return 0 , err
2019-04-19 09:55:15 +00:00
}
2019-04-22 14:00:19 +00:00
// If we've reached this point then everything went to plan, return the
// number of bytes we populated back into the given slice
2019-04-19 09:55:15 +00:00
return len ( b ) , nil
2019-04-18 22:38:23 +00:00
}
2019-04-18 15:38:24 +00:00
}
2019-04-19 09:55:15 +00:00
func ( c * Conn ) Write ( b [ ] byte ) ( bytesWritten int , err error ) {
2019-04-22 10:20:35 +00:00
c . mutex . RLock ( )
sinfo := c . session
c . mutex . RUnlock ( )
2019-04-27 00:31:47 +00:00
timer := getDeadlineTimer ( & c . writeDeadline )
defer util . TimerStop ( timer )
2019-04-22 14:00:19 +00:00
// If the session doesn't exist, or isn't initialised (which probably means
2019-04-22 22:58:59 +00:00
// that the search didn't complete successfully) then we may need to wait for
// the search to complete or start the search again
2019-04-22 19:06:39 +00:00
if sinfo == nil || ! sinfo . init {
2019-04-22 14:00:19 +00:00
// Is a search already taking place?
if searching , sok := c . searching . Load ( ) . ( bool ) ; ! sok || ( sok && ! searching ) {
// No search was already taking place so start a new one
2019-04-27 02:49:11 +00:00
c . core . router . doAdmin ( c . startSearch )
2019-04-22 21:38:37 +00:00
}
2019-04-22 22:58:59 +00:00
// Wait for the search to complete
2019-04-27 00:31:47 +00:00
select {
case <- c . searchwait :
case <- timer . C :
return 0 , ConnError { errors . New ( "Timeout" ) , true , false }
}
2019-04-22 22:58:59 +00:00
// Retrieve our session info again
c . mutex . RLock ( )
sinfo = c . session
c . mutex . RUnlock ( )
// If sinfo is still nil at this point then the search failed and the
// searchwait channel has been recreated, so might as well give up and
// return an error code
if sinfo == nil {
return 0 , errors . New ( "search failed" )
2019-04-22 14:00:19 +00:00
}
2019-04-18 22:38:23 +00:00
}
2019-04-22 01:38:14 +00:00
var packet [ ] byte
2019-04-27 00:31:47 +00:00
done := make ( chan struct { } )
workerFunc := func ( ) {
defer close ( done )
2019-04-22 14:00:19 +00:00
// Encrypt the packet
2019-04-22 01:38:14 +00:00
payload , nonce := crypto . BoxSeal ( & sinfo . sharedSesKey , b , & sinfo . myNonce )
defer util . PutBytes ( payload )
2019-04-22 14:00:19 +00:00
// Construct the wire packet to send to the router
2019-04-22 01:38:14 +00:00
p := wire_trafficPacket {
Coords : sinfo . coords ,
Handle : sinfo . theirHandle ,
Nonce : * nonce ,
Payload : payload ,
}
packet = p . encode ( )
sinfo . bytesSent += uint64 ( len ( b ) )
2019-04-27 00:31:47 +00:00
}
// Hand over to the session worker
select { // Send to worker
case sinfo . worker <- workerFunc :
case <- timer . C :
return 0 , ConnError { errors . New ( "Timeout" ) , true , false }
}
2019-04-27 03:21:31 +00:00
<- done // Wait for the worker to finish, failing this can cause memory errors (util.[Get||Put]Bytes stuff)
2019-04-22 14:00:19 +00:00
// Give the packet to the router
2019-04-22 01:38:14 +00:00
sinfo . core . router . out ( packet )
2019-04-22 14:00:19 +00:00
// Finally return the number of bytes we wrote
2019-04-18 22:38:23 +00:00
return len ( b ) , nil
2019-04-18 15:38:24 +00:00
}
func ( c * Conn ) Close ( ) error {
2019-04-27 00:31:47 +00:00
c . mutex . Lock ( )
defer c . mutex . Unlock ( )
if c . session != nil {
// Close the session, if it hasn't been closed already
c . session . close ( )
c . session = nil
}
2019-04-22 14:00:19 +00:00
// This can't fail yet - TODO?
2019-04-27 00:31:47 +00:00
c . closed = true
2019-04-18 15:38:24 +00:00
return nil
}
func ( c * Conn ) LocalAddr ( ) crypto . NodeID {
return * crypto . GetNodeID ( & c . session . core . boxPub )
}
func ( c * Conn ) RemoteAddr ( ) crypto . NodeID {
2019-04-20 15:32:27 +00:00
c . mutex . RLock ( )
defer c . mutex . RUnlock ( )
return * c . nodeID
2019-04-18 15:38:24 +00:00
}
func ( c * Conn ) SetDeadline ( t time . Time ) error {
c . SetReadDeadline ( t )
c . SetWriteDeadline ( t )
return nil
}
func ( c * Conn ) SetReadDeadline ( t time . Time ) error {
2019-04-22 10:20:35 +00:00
c . readDeadline . Store ( t )
2019-04-18 15:38:24 +00:00
return nil
}
func ( c * Conn ) SetWriteDeadline ( t time . Time ) error {
2019-04-22 10:20:35 +00:00
c . writeDeadline . Store ( t )
2019-04-18 15:38:24 +00:00
return nil
}