5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-10 07:30:27 +00:00

Metadata exchange without sessions

This commit is contained in:
Neil Alexander 2018-12-15 00:48:27 +00:00
parent affddfb2fe
commit d5031a5cb6
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944
6 changed files with 187 additions and 113 deletions

View File

@ -322,6 +322,14 @@ func (a *admin) init(c *Core, listenaddr string) {
return admin_info{}, err return admin_info{}, err
} }
}) })
a.addHandler("getMeta", []string{"box_pub_key", "coords"}, func(in admin_info) (admin_info, error) {
result, err := a.admin_getMeta(in["box_pub_key"].(string), in["coords"].(string))
if err == nil {
return admin_info{"metadata": string(result)}, nil
} else {
return admin_info{}, err
}
})
} }
// start runs the admin API socket to listen for / respond to admin API calls. // start runs the admin API socket to listen for / respond to admin API calls.
@ -806,6 +814,50 @@ func (a *admin) admin_dhtPing(keyString, coordString, targetString string) (dhtR
return dhtRes{}, errors.New(fmt.Sprintf("DHT ping timeout: %s", keyString)) return dhtRes{}, errors.New(fmt.Sprintf("DHT ping timeout: %s", keyString))
} }
func (a *admin) admin_getMeta(keyString, coordString string) (metadataPayload, error) {
var key boxPubKey
if keyBytes, err := hex.DecodeString(keyString); err != nil {
return metadataPayload{}, err
} else {
copy(key[:], keyBytes)
}
var coords []byte
for _, cstr := range strings.Split(strings.Trim(coordString, "[]"), " ") {
if cstr == "" {
// Special case, happens if trimmed is the empty string, e.g. this is the root
continue
}
if u64, err := strconv.ParseUint(cstr, 10, 8); err != nil {
return metadataPayload{}, err
} else {
coords = append(coords, uint8(u64))
}
}
response := make(chan *metadataPayload, 1)
sendMetaRequest := func() {
a.core.metadata.callbacks[key] = metadataCallback{
created: time.Now(),
call: func(meta *metadataPayload) {
defer func() { recover() }()
select {
case response <- meta:
default:
}
},
}
a.core.metadata.sendMetadata(key, coords, false)
}
a.core.router.doAdmin(sendMetaRequest)
go func() {
time.Sleep(6 * time.Second)
close(response)
}()
for res := range response {
return *res, nil
}
return metadataPayload{}, errors.New(fmt.Sprintf("getMeta timeout: %s", keyString))
}
// getResponse_dot returns a response for a graphviz dot formatted representation of the known parts of the network. // getResponse_dot returns a response for a graphviz dot formatted representation of the known parts of the network.
// This is color-coded and labeled, and includes the self node, switch peers, nodes known to the DHT, and nodes with open sessions. // This is color-coded and labeled, and includes the self node, switch peers, nodes known to the DHT, and nodes with open sessions.
// The graph is structured as a tree with directed links leading away from the root. // The graph is structured as a tree with directed links leading away from the root.

View File

@ -32,6 +32,7 @@ type Core struct {
admin admin admin admin
searches searches searches searches
multicast multicast multicast multicast
metadata metadata
tcp tcpInterface tcp tcpInterface
log *log.Logger log *log.Logger
ifceExpr []*regexp.Regexp // the zone of link-local IPv6 peers must match this ifceExpr []*regexp.Regexp // the zone of link-local IPv6 peers must match this
@ -124,6 +125,9 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error {
c.init(&boxPub, &boxPriv, &sigPub, &sigPriv) c.init(&boxPub, &boxPriv, &sigPub, &sigPriv)
c.admin.init(c, nc.AdminListen) c.admin.init(c, nc.AdminListen)
c.metadata.init(c)
c.metadata.setMetadata(metadataPayload("HIYA, THIS IS METADATA"))
if err := c.tcp.init(c, nc.Listen, nc.ReadTimeout); err != nil { if err := c.tcp.init(c, nc.Listen, nc.ReadTimeout); err != nil {
c.log.Println("Failed to start TCP interface") c.log.Println("Failed to start TCP interface")
return err return err
@ -138,7 +142,6 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error {
return err return err
} }
c.sessions.setMetadata(metadata("HIYA, THIS IS METADATA"))
c.sessions.setSessionFirewallState(nc.SessionFirewall.Enable) c.sessions.setSessionFirewallState(nc.SessionFirewall.Enable)
c.sessions.setSessionFirewallDefaults( c.sessions.setSessionFirewallDefaults(
nc.SessionFirewall.AllowFromDirect, nc.SessionFirewall.AllowFromDirect,
@ -241,13 +244,13 @@ func (c *Core) GetSubnet() *net.IPNet {
} }
// Gets the node metadata. // Gets the node metadata.
func (c *Core) GetMetadata() metadata { func (c *Core) GetMetadata() metadataPayload {
return c.sessions.getMetadata() return c.metadata.getMetadata()
} }
// Sets the node metadata. // Sets the node metadata.
func (c *Core) SetMetadata(meta metadata) { func (c *Core) SetMetadata(meta metadataPayload) {
c.sessions.setMetadata(meta) c.metadata.setMetadata(meta)
} }
// Sets the output logger of the Yggdrasil node after startup. This may be // Sets the output logger of the Yggdrasil node after startup. This may be

92
src/yggdrasil/metadata.go Normal file
View File

@ -0,0 +1,92 @@
package yggdrasil
import (
"sync"
"time"
)
type metadata struct {
core *Core
myMetadata metadataPayload
myMetadataMutex sync.RWMutex
callbacks map[boxPubKey]metadataCallback
cache map[boxPubKey]metadataPayload
}
type metadataPayload []byte
type metadataCallback struct {
call func(meta *metadataPayload)
created time.Time
}
// Initialises the metadata cache/callback stuff
func (m *metadata) init(core *Core) {
m.core = core
m.callbacks = make(map[boxPubKey]metadataCallback)
m.cache = make(map[boxPubKey]metadataPayload)
go func() {
for {
for boxPubKey, callback := range m.callbacks {
if time.Since(callback.created) > time.Minute {
delete(m.callbacks, boxPubKey)
}
}
time.Sleep(time.Second * 5)
}
}()
}
// Handles the callback, if there is one
func (m *metadata) callback(sender boxPubKey, meta metadataPayload) {
if callback, ok := m.callbacks[sender]; ok {
callback.call(&meta)
delete(m.callbacks, sender)
}
}
// Get the metadata
func (m *metadata) getMetadata() metadataPayload {
m.myMetadataMutex.RLock()
defer m.myMetadataMutex.RUnlock()
return m.myMetadata
}
// Set the metadata
func (m *metadata) setMetadata(meta metadataPayload) {
m.myMetadataMutex.Lock()
defer m.myMetadataMutex.Unlock()
m.myMetadata = meta
}
// Handles a meta request/response.
func (m *metadata) handleMetadata(meta *sessionMeta) {
if meta.IsResponse {
m.core.metadata.callback(meta.SendPermPub, meta.Metadata)
} else {
m.sendMetadata(meta.SendPermPub, meta.SendCoords, true)
}
}
// Send metadata request or response
func (m *metadata) sendMetadata(key boxPubKey, coords []byte, isResponse bool) {
table := m.core.switchTable.table.Load().(lookupTable)
meta := sessionMeta{
SendCoords: table.self.getCoords(),
IsResponse: isResponse,
Metadata: m.core.metadata.getMetadata(),
}
bs := meta.encode()
shared := m.core.sessions.getSharedKey(&m.core.boxPriv, &key)
payload, nonce := boxSeal(shared, bs, nil)
p := wire_protoTrafficPacket{
Coords: coords,
ToKey: key,
FromKey: m.core.boxPub,
Nonce: *nonce,
Payload: payload,
}
packet := p.encode()
m.core.router.out(packet)
}

View File

@ -483,7 +483,7 @@ func (r *router) handleMetadata(bs []byte, fromKey *boxPubKey) {
return return
} }
req.SendPermPub = *fromKey req.SendPermPub = *fromKey
r.core.sessions.handleMetadata(&req) r.core.metadata.handleMetadata(&req)
} }
// Passed a function to call. // Passed a function to call.

View File

@ -7,7 +7,6 @@ package yggdrasil
import ( import (
"bytes" "bytes"
"encoding/hex" "encoding/hex"
"sync"
"time" "time"
) )
@ -26,9 +25,6 @@ type sessionInfo struct {
myHandle handle myHandle handle
theirNonce boxNonce theirNonce boxNonce
myNonce boxNonce myNonce boxNonce
metaReqTime time.Time
metaResTime time.Time
theirMetadata metadata
theirMTU uint16 theirMTU uint16
myMTU uint16 myMTU uint16
wasMTUFixed bool // Was the MTU fixed by a receive error? wasMTUFixed bool // Was the MTU fixed by a receive error?
@ -61,12 +57,11 @@ type sessionPing struct {
// Represents a session metadata packet. // Represents a session metadata packet.
type sessionMeta struct { type sessionMeta struct {
SendPermPub boxPubKey // Sender's permanent key SendPermPub boxPubKey // Sender's permanent key
SendCoords []byte // Sender's coords
IsResponse bool IsResponse bool
Metadata metadata Metadata metadataPayload
} }
type metadata []byte
// Updates session info in response to a ping, after checking that the ping is OK. // Updates session info in response to a ping, after checking that the ping is OK.
// Returns true if the session was updated, or false otherwise. // Returns true if the session was updated, or false otherwise.
func (s *sessionInfo) update(p *sessionPing) bool { func (s *sessionInfo) update(p *sessionPing) bool {
@ -128,9 +123,6 @@ type sessions struct {
sessionFirewallAlwaysAllowsOutbound bool sessionFirewallAlwaysAllowsOutbound bool
sessionFirewallWhitelist []string sessionFirewallWhitelist []string
sessionFirewallBlacklist []string sessionFirewallBlacklist []string
// Metadata for this node
myMetadata metadata
myMetadataMutex sync.RWMutex
} }
// Initializes the session struct. // Initializes the session struct.
@ -145,20 +137,6 @@ func (ss *sessions) init(core *Core) {
ss.lastCleanup = time.Now() ss.lastCleanup = time.Now()
} }
// Get the metadata
func (ss *sessions) getMetadata() metadata {
ss.myMetadataMutex.RLock()
defer ss.myMetadataMutex.RUnlock()
return ss.myMetadata
}
// Set the metadata
func (ss *sessions) setMetadata(meta metadata) {
ss.myMetadataMutex.Lock()
defer ss.myMetadataMutex.Unlock()
ss.myMetadata = meta
}
// Enable or disable the session firewall // Enable or disable the session firewall
func (ss *sessions) setSessionFirewallState(enabled bool) { func (ss *sessions) setSessionFirewallState(enabled bool) {
ss.sessionFirewallEnabled = enabled ss.sessionFirewallEnabled = enabled
@ -496,60 +474,6 @@ func (ss *sessions) handlePing(ping *sessionPing) {
bs, sinfo.packet = sinfo.packet, nil bs, sinfo.packet = sinfo.packet, nil
ss.core.router.sendPacket(bs) ss.core.router.sendPacket(bs)
} }
// This requests metadata from the remote side fairly quickly after
// establishing the session, and if other time constraints apply (no more
// often than 15 minutes since receiving the last metadata)
//if time.Since(sinfo.metaResTime).Minutes() > 15 {
// if time.Since(sinfo.metaReqTime).Minutes() > 1 {
// ss.sendMetadata(sinfo, false)
// }
//}
}
func (ss *sessions) sendMetadata(sinfo *sessionInfo, isResponse bool) {
ss.myMetadataMutex.RLock()
meta := sessionMeta{
IsResponse: isResponse,
Metadata: ss.myMetadata,
}
ss.myMetadataMutex.RUnlock()
bs := meta.encode()
shared := ss.getSharedKey(&ss.core.boxPriv, &sinfo.theirPermPub)
payload, nonce := boxSeal(shared, bs, nil)
p := wire_protoTrafficPacket{
Coords: sinfo.coords,
ToKey: sinfo.theirPermPub,
FromKey: ss.core.boxPub,
Nonce: *nonce,
Payload: payload,
}
packet := p.encode()
ss.core.router.out(packet)
if !isResponse {
sinfo.metaReqTime = time.Now()
}
}
// Handles a meta request/response.
func (ss *sessions) handleMetadata(meta *sessionMeta) {
// Get the corresponding session (or create a new session)
sinfo, isIn := ss.getByTheirPerm(&meta.SendPermPub)
// Check the session firewall
if !isIn && ss.sessionFirewallEnabled {
if !ss.isSessionAllowed(&meta.SendPermPub, false) {
return
}
}
if !isIn || sinfo.timedout() {
return
}
if meta.IsResponse {
sinfo.theirMetadata = meta.Metadata
sinfo.metaResTime = time.Now()
} else {
ss.sendMetadata(sinfo, true)
}
} }
// Used to subtract one nonce from another, staying in the range +- 64. // Used to subtract one nonce from another, staying in the range +- 64.

View File

@ -364,6 +364,7 @@ func (p *sessionMeta) encode() []byte {
pTypeVal = wire_SessionMetaRequest pTypeVal = wire_SessionMetaRequest
} }
bs := wire_encode_uint64(pTypeVal) bs := wire_encode_uint64(pTypeVal)
bs = wire_put_coords(p.SendCoords, bs)
if pTypeVal == wire_SessionMetaResponse { if pTypeVal == wire_SessionMetaResponse {
bs = append(bs, p.Metadata...) bs = append(bs, p.Metadata...)
} }
@ -378,12 +379,14 @@ func (p *sessionMeta) decode(bs []byte) bool {
return false return false
case pType != wire_SessionMetaRequest && pType != wire_SessionMetaResponse: case pType != wire_SessionMetaRequest && pType != wire_SessionMetaResponse:
return false return false
case !wire_chop_coords(&p.SendCoords, &bs):
return false
} }
if p.IsResponse = pType == wire_SessionMetaResponse; p.IsResponse { if p.IsResponse = pType == wire_SessionMetaResponse; p.IsResponse {
if len(bs) == 0 { if len(bs) == 0 {
return false return false
} }
p.Metadata = make(metadata, len(bs)) p.Metadata = make(metadataPayload, len(bs))
if !wire_chop_slice(p.Metadata[:], &bs) { if !wire_chop_slice(p.Metadata[:], &bs) {
return false return false
} }