2017-12-29 04:16:20 +00:00
package yggdrasil
// TODO cleanup, this file is kind of a mess
2018-01-26 23:30:51 +00:00
// Commented code should be removed
// Live code should be better commented
2017-12-29 04:16:20 +00:00
2018-01-26 23:30:51 +00:00
// FIXME (!) this part may be at least sligtly vulnerable to replay attacks
2017-12-29 04:16:20 +00:00
// The switch message part should catch / drop old tstamps
// So the damage is limited
// But you could still mess up msgAnc / msgHops and break some things there
2018-01-26 23:30:51 +00:00
// It needs to ignore messages with a lower seq
// Probably best to start setting seq to a timestamp in that case...
// FIXME (!?) if it takes too long to communicate all the msgHops, then things hit a horizon
// That could happen with a peer over a high-latency link, with many msgHops
// Possible workarounds:
// 1. Pre-emptively send all hops when one is requested, or after any change
// Maybe requires changing how the throttle works and msgHops are saved
// In case some arrive out of order or are dropped
// This is relatively easy to implement, but could be wasteful
// 2. Save your old locator, sigs, etc, so you can respond to older ancs
// And finish requesting an old anc before updating to a new one
// But that may lead to other issues if not done carefully...
2017-12-29 04:16:20 +00:00
import "time"
import "sync"
import "sync/atomic"
2018-01-04 22:37:51 +00:00
2017-12-29 04:16:20 +00:00
//import "fmt"
type peers struct {
2018-05-07 00:01:52 +00:00
core * Core
mutex sync . Mutex // Synchronize writes to atomic
ports atomic . Value //map[Port]*peer, use CoW semantics
2018-01-04 22:37:51 +00:00
//ports map[Port]*peer
2018-05-23 10:28:20 +00:00
authMutex sync . RWMutex
allowedEncryptionPublicKeys map [ boxPubKey ] struct { }
2017-12-29 04:16:20 +00:00
}
func ( ps * peers ) init ( c * Core ) {
2018-01-04 22:37:51 +00:00
ps . mutex . Lock ( )
defer ps . mutex . Unlock ( )
ps . putPorts ( make ( map [ switchPort ] * peer ) )
ps . core = c
2018-05-23 10:28:20 +00:00
ps . allowedEncryptionPublicKeys = make ( map [ boxPubKey ] struct { } )
2018-05-06 21:32:34 +00:00
}
2018-05-23 10:28:20 +00:00
func ( ps * peers ) isAllowedEncryptionPublicKey ( box * boxPubKey ) bool {
2018-05-07 00:01:52 +00:00
ps . authMutex . RLock ( )
defer ps . authMutex . RUnlock ( )
2018-05-23 10:28:20 +00:00
_ , isIn := ps . allowedEncryptionPublicKeys [ * box ]
return isIn || len ( ps . allowedEncryptionPublicKeys ) == 0
2017-12-29 04:16:20 +00:00
}
2018-05-23 10:28:20 +00:00
func ( ps * peers ) addAllowedEncryptionPublicKey ( box * boxPubKey ) {
2018-05-07 00:01:52 +00:00
ps . authMutex . Lock ( )
defer ps . authMutex . Unlock ( )
2018-05-23 10:28:20 +00:00
ps . allowedEncryptionPublicKeys [ * box ] = struct { } { }
2018-05-07 00:01:52 +00:00
}
2018-05-23 10:28:20 +00:00
func ( ps * peers ) removeAllowedEncryptionPublicKey ( box * boxPubKey ) {
2018-05-07 00:01:52 +00:00
ps . authMutex . Lock ( )
defer ps . authMutex . Unlock ( )
2018-05-23 10:28:20 +00:00
delete ( ps . allowedEncryptionPublicKeys , * box )
2018-05-07 00:01:52 +00:00
}
2018-05-23 10:28:20 +00:00
func ( ps * peers ) getAllowedEncryptionPublicKeys ( ) [ ] boxPubKey {
2018-05-07 00:01:52 +00:00
ps . authMutex . RLock ( )
defer ps . authMutex . RUnlock ( )
2018-05-23 10:28:20 +00:00
keys := make ( [ ] boxPubKey , 0 , len ( ps . allowedEncryptionPublicKeys ) )
for key := range ps . allowedEncryptionPublicKeys {
2018-05-07 00:01:52 +00:00
keys = append ( keys , key )
}
return keys
}
2017-12-29 04:16:20 +00:00
func ( ps * peers ) getPorts ( ) map [ switchPort ] * peer {
2018-01-04 22:37:51 +00:00
return ps . ports . Load ( ) . ( map [ switchPort ] * peer )
2017-12-29 04:16:20 +00:00
}
func ( ps * peers ) putPorts ( ports map [ switchPort ] * peer ) {
2018-01-04 22:37:51 +00:00
ps . ports . Store ( ports )
2017-12-29 04:16:20 +00:00
}
type peer struct {
2018-01-21 18:55:45 +00:00
// Rolling approximation of bandwidth, in bps, used by switch, updated by packet sends
2018-01-04 22:37:51 +00:00
// use get/update methods only! (atomic accessors as float64)
2018-05-27 18:37:35 +00:00
queueSize int64
2018-05-19 01:41:02 +00:00
bytesSent uint64 // To track bandwidth usage for getPeers
bytesRecvd uint64 // To track bandwidth usage for getPeers
2018-01-04 22:37:51 +00:00
// BUG: sync/atomic, 32 bit platforms need the above to be the first element
2018-05-19 01:41:02 +00:00
firstSeen time . Time // To track uptime for getPeers
box boxPubKey
sig sigPubKey
shared boxSharedKey
2018-01-04 22:37:51 +00:00
//in <-chan []byte
//out chan<- []byte
//in func([]byte)
2018-06-07 02:18:21 +00:00
out func ( [ ] byte )
core * Core
port switchPort
2018-01-04 22:37:51 +00:00
// This is used to limit how often we perform expensive operations
2018-06-07 03:39:22 +00:00
throttle uint8 // TODO apply this sanely
2018-05-05 22:14:03 +00:00
// Called when a peer is removed, to close the underlying connection, or via admin api
close func ( )
2018-05-06 22:17:12 +00:00
// To allow the peer to call close if idle for too long
2018-06-07 03:39:22 +00:00
lastAnc time . Time // TODO? rename and use this
2018-06-06 22:44:10 +00:00
// used for protocol traffic (to bypass queues)
linkOut ( chan [ ] byte )
2018-06-07 05:16:47 +00:00
lastMsg [ ] byte // last switchMsg accepted
doSend ( chan struct { } ) // tell the linkLoop to send a switchMsg
2017-12-29 04:16:20 +00:00
}
2018-01-04 22:37:51 +00:00
2017-12-29 04:16:20 +00:00
const peer_Throttle = 1
2018-05-27 18:37:35 +00:00
func ( p * peer ) getQueueSize ( ) int64 {
return atomic . LoadInt64 ( & p . queueSize )
2017-12-29 04:16:20 +00:00
}
2018-05-27 18:37:35 +00:00
func ( p * peer ) updateQueueSize ( delta int64 ) {
atomic . AddInt64 ( & p . queueSize , delta )
2017-12-29 04:16:20 +00:00
}
2018-06-06 22:44:10 +00:00
func ( ps * peers ) newPeer ( box * boxPubKey , sig * sigPubKey ) * peer {
2018-05-19 01:41:02 +00:00
now := time . Now ( )
2018-01-04 22:37:51 +00:00
p := peer { box : * box ,
2018-05-19 01:41:02 +00:00
sig : * sig ,
shared : * getSharedKey ( & ps . core . boxPriv , box ) ,
lastAnc : now ,
firstSeen : now ,
2018-06-07 05:16:47 +00:00
doSend : make ( chan struct { } , 1 ) ,
2018-05-19 01:41:02 +00:00
core : ps . core }
2018-01-04 22:37:51 +00:00
ps . mutex . Lock ( )
defer ps . mutex . Unlock ( )
oldPorts := ps . getPorts ( )
newPorts := make ( map [ switchPort ] * peer )
for k , v := range oldPorts {
newPorts [ k ] = v
}
for idx := switchPort ( 0 ) ; true ; idx ++ {
if _ , isIn := newPorts [ idx ] ; ! isIn {
p . port = switchPort ( idx )
newPorts [ p . port ] = & p
break
}
}
ps . putPorts ( newPorts )
return & p
2017-12-29 04:16:20 +00:00
}
2018-05-05 22:14:03 +00:00
func ( ps * peers ) removePeer ( port switchPort ) {
if port == 0 {
return
} // Can't remove self peer
2018-06-07 04:23:16 +00:00
ps . core . router . doAdmin ( func ( ) {
ps . core . switchTable . removePeer ( port )
} )
2018-05-05 22:14:03 +00:00
ps . mutex . Lock ( )
oldPorts := ps . getPorts ( )
p , isIn := oldPorts [ port ]
newPorts := make ( map [ switchPort ] * peer )
for k , v := range oldPorts {
newPorts [ k ] = v
}
delete ( newPorts , port )
ps . putPorts ( newPorts )
ps . mutex . Unlock ( )
2018-06-07 04:23:16 +00:00
if isIn {
if p . close != nil {
p . close ( )
}
2018-06-07 05:49:06 +00:00
close ( p . doSend )
2018-05-05 22:14:03 +00:00
}
}
2018-06-07 05:16:47 +00:00
func ( ps * peers ) sendSwitchMsgs ( ) {
ports := ps . getPorts ( )
for _ , p := range ports {
if p . port == 0 {
continue
}
select {
case p . doSend <- struct { } { } :
default :
}
}
}
func ( ps * peers ) fixSwitchAfterPeerDisconnect ( ) {
// TODO something better, this is very wasteful
ports := ps . getPorts ( )
for _ , p := range ports {
if p . lastMsg == nil {
continue
}
p . handleSwitchMsg ( p . lastMsg )
}
}
2018-06-06 22:44:10 +00:00
func ( p * peer ) linkLoop ( ) {
2018-06-07 05:16:47 +00:00
go func ( ) { p . doSend <- struct { } { } } ( )
2018-06-07 05:49:06 +00:00
for range p . doSend {
p . sendSwitchMsg ( )
2018-01-04 22:37:51 +00:00
}
2017-12-29 04:16:20 +00:00
}
2018-06-06 22:44:10 +00:00
func ( p * peer ) handlePacket ( packet [ ] byte ) {
2018-05-19 01:41:02 +00:00
// TODO See comment in sendPacket about atomics technically being done wrong
atomic . AddUint64 ( & p . bytesRecvd , uint64 ( len ( packet ) ) )
2018-01-04 22:37:51 +00:00
pType , pTypeLen := wire_decode_uint64 ( packet )
if pTypeLen == 0 {
return
}
switch pType {
case wire_Traffic :
p . handleTraffic ( packet , pTypeLen )
case wire_ProtocolTraffic :
p . handleTraffic ( packet , pTypeLen )
case wire_LinkProtocolTraffic :
2018-06-07 05:49:06 +00:00
p . handleLinkTraffic ( packet )
default :
2018-01-04 22:37:51 +00:00
return
}
2017-12-29 04:16:20 +00:00
}
func ( p * peer ) handleTraffic ( packet [ ] byte , pTypeLen int ) {
2018-06-07 05:16:47 +00:00
if p . port != 0 && p . lastMsg == nil {
// Drop traffic until the peer manages to send us at least one good switchMsg
return
}
2018-01-04 22:37:51 +00:00
ttl , ttlLen := wire_decode_uint64 ( packet [ pTypeLen : ] )
ttlBegin := pTypeLen
ttlEnd := pTypeLen + ttlLen
coords , coordLen := wire_decode_coords ( packet [ ttlEnd : ] )
coordEnd := ttlEnd + coordLen
if coordEnd == len ( packet ) {
return
} // No payload
toPort , newTTL := p . core . switchTable . lookup ( coords , ttl )
if toPort == p . port {
return
2018-01-26 23:30:51 +00:00
}
2018-01-04 22:37:51 +00:00
to := p . core . peers . getPorts ( ) [ toPort ]
if to == nil {
return
}
// This mutates the packet in-place if the length of the TTL changes!
2018-02-18 03:59:08 +00:00
ttlSlice := wire_encode_uint64 ( newTTL )
newTTLLen := len ( ttlSlice )
2018-01-13 13:26:26 +00:00
shift := ttlLen - newTTLLen
2018-01-04 22:37:51 +00:00
copy ( packet [ shift : ] , packet [ : pTypeLen ] )
2018-02-18 03:59:08 +00:00
copy ( packet [ ttlBegin + shift : ] , ttlSlice )
2018-01-04 22:37:51 +00:00
packet = packet [ shift : ]
to . sendPacket ( packet )
2017-12-29 04:16:20 +00:00
}
func ( p * peer ) sendPacket ( packet [ ] byte ) {
2018-01-04 22:37:51 +00:00
// Is there ever a case where something more complicated is needed?
// What if p.out blocks?
p . out ( packet )
2018-05-19 01:41:02 +00:00
// TODO this should really happen at the interface, to account for LIFO packet drops and additional per-packet/per-message overhead, but this should be pretty close... better to move it to the tcp/udp stuff *after* rewriting both to give a common interface
atomic . AddUint64 ( & p . bytesSent , uint64 ( len ( packet ) ) )
2017-12-29 04:16:20 +00:00
}
func ( p * peer ) sendLinkPacket ( packet [ ] byte ) {
2018-01-04 22:37:51 +00:00
bs , nonce := boxSeal ( & p . shared , packet , nil )
linkPacket := wire_linkProtoTrafficPacket {
2018-06-02 20:21:05 +00:00
Nonce : * nonce ,
Payload : bs ,
2018-01-04 22:37:51 +00:00
}
packet = linkPacket . encode ( )
2018-06-06 22:44:10 +00:00
p . linkOut <- packet
2017-12-29 04:16:20 +00:00
}
func ( p * peer ) handleLinkTraffic ( bs [ ] byte ) {
2018-01-04 22:37:51 +00:00
packet := wire_linkProtoTrafficPacket { }
if ! packet . decode ( bs ) {
return
}
2018-06-02 20:21:05 +00:00
payload , isOK := boxOpen ( & p . shared , packet . Payload , & packet . Nonce )
2018-01-04 22:37:51 +00:00
if ! isOK {
return
}
pType , pTypeLen := wire_decode_uint64 ( payload )
if pTypeLen == 0 {
return
}
switch pType {
2018-06-07 02:11:10 +00:00
case wire_SwitchMsg :
p . handleSwitchMsg ( payload )
2018-06-07 02:18:21 +00:00
default : // TODO?...
2018-01-04 22:37:51 +00:00
}
2018-06-07 02:11:10 +00:00
}
func ( p * peer ) sendSwitchMsg ( ) {
info , sigs := p . core . switchTable . createMessage ( p . port )
var msg switchMsg
msg . Root , msg . TStamp = info . locator . root , info . locator . tstamp
for idx , sig := range sigs {
hop := switchMsgHop {
Port : info . locator . coords [ idx ] ,
Next : sig . next ,
Sig : sig . sig ,
}
msg . Hops = append ( msg . Hops , hop )
}
2018-06-07 03:39:22 +00:00
bs := getBytesForSig ( & p . sig , & info . locator )
2018-06-07 02:11:10 +00:00
msg . Hops = append ( msg . Hops , switchMsgHop {
Port : p . port ,
Next : p . sig ,
2018-06-07 03:39:22 +00:00
Sig : * sign ( & p . core . sigPriv , bs ) ,
2018-06-07 02:11:10 +00:00
} )
packet := msg . encode ( )
//p.core.log.Println("Encoded msg:", msg, "; bytes:", packet)
p . sendLinkPacket ( packet )
}
func ( p * peer ) handleSwitchMsg ( packet [ ] byte ) {
var msg switchMsg
msg . decode ( packet )
//p.core.log.Println("Decoded msg:", msg, "; bytes:", packet)
if len ( msg . Hops ) < 1 {
p . throttle ++
panic ( "FIXME testing" )
return
}
var info switchMessage
var sigs [ ] sigInfo
info . locator . root = msg . Root
info . locator . tstamp = msg . TStamp
2018-06-07 03:39:22 +00:00
prevKey := msg . Root
2018-06-07 02:11:10 +00:00
for _ , hop := range msg . Hops {
2018-06-07 03:39:22 +00:00
// Build locator and signatures
2018-06-07 02:11:10 +00:00
var sig sigInfo
sig . next = hop . Next
sig . sig = hop . Sig
sigs = append ( sigs , sig )
info . locator . coords = append ( info . locator . coords , hop . Port )
2018-06-07 03:39:22 +00:00
// Check signature
bs := getBytesForSig ( & sig . next , & info . locator )
if ! p . core . sigs . check ( & prevKey , & sig . sig , bs ) {
p . throttle ++
panic ( "FIXME testing" )
return
2018-06-07 02:11:10 +00:00
}
2018-06-07 03:39:22 +00:00
prevKey = sig . next
2018-06-07 02:11:10 +00:00
}
info . from = p . sig
info . seq = uint64 ( time . Now ( ) . Unix ( ) )
p . core . switchTable . handleMessage ( & info , p . port , sigs )
// Pass a mesage to the dht informing it that this peer (still) exists
l := info . locator
l . coords = l . coords [ : len ( l . coords ) - 1 ]
2018-01-04 22:37:51 +00:00
dinfo := dhtInfo {
key : p . box ,
2018-06-07 02:11:10 +00:00
coords : l . getCoords ( ) ,
2018-01-04 22:37:51 +00:00
}
p . core . dht . peers <- & dinfo
2018-06-07 05:16:47 +00:00
p . lastMsg = packet
2017-12-29 04:16:20 +00:00
}
func getBytesForSig ( next * sigPubKey , loc * switchLocator ) [ ] byte {
2018-01-04 22:37:51 +00:00
bs := append ( [ ] byte ( nil ) , next [ : ] ... )
2018-06-07 04:00:17 +00:00
bs = append ( bs , loc . root [ : ] ... )
bs = append ( bs , wire_encode_uint64 ( wire_intToUint ( loc . tstamp ) ) ... )
bs = append ( bs , wire_encode_coords ( loc . getCoords ( ) ) ... )
2018-01-04 22:37:51 +00:00
return bs
2017-12-29 04:16:20 +00:00
}