5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-15 14:20:29 +00:00
yggdrasil-go/src/yggdrasil/metadata.go

165 lines
4.0 KiB
Go
Raw Normal View History

2018-12-15 00:48:27 +00:00
package yggdrasil
import (
2018-12-15 11:38:51 +00:00
"encoding/json"
2018-12-15 11:15:48 +00:00
"errors"
2018-12-15 11:38:51 +00:00
"runtime"
2018-12-15 00:48:27 +00:00
"sync"
"time"
)
type metadata struct {
core *Core
myMetadata metadataPayload
myMetadataMutex sync.RWMutex
callbacks map[boxPubKey]metadataCallback
2018-12-15 10:39:31 +00:00
callbacksMutex sync.Mutex
2018-12-15 11:15:48 +00:00
cache map[boxPubKey]metadataCached
2018-12-15 10:39:31 +00:00
cacheMutex sync.RWMutex
2018-12-15 00:48:27 +00:00
}
type metadataPayload []byte
2018-12-15 11:15:48 +00:00
type metadataCached struct {
payload metadataPayload
created time.Time
}
2018-12-15 00:48:27 +00:00
type metadataCallback struct {
call func(meta *metadataPayload)
created time.Time
}
2018-12-15 11:15:48 +00:00
// Initialises the metadata cache/callback maps, and starts a goroutine to keep
// the cache/callback maps clean of stale entries
2018-12-15 00:48:27 +00:00
func (m *metadata) init(core *Core) {
m.core = core
m.callbacks = make(map[boxPubKey]metadataCallback)
2018-12-15 11:15:48 +00:00
m.cache = make(map[boxPubKey]metadataCached)
2018-12-15 00:48:27 +00:00
go func() {
for {
2018-12-15 11:15:48 +00:00
m.callbacksMutex.Lock()
2018-12-15 00:48:27 +00:00
for boxPubKey, callback := range m.callbacks {
if time.Since(callback.created) > time.Minute {
delete(m.callbacks, boxPubKey)
}
}
2018-12-15 11:15:48 +00:00
m.callbacksMutex.Unlock()
m.cacheMutex.Lock()
for boxPubKey, cache := range m.cache {
if time.Since(cache.created) > time.Hour {
delete(m.cache, boxPubKey)
}
}
m.cacheMutex.Unlock()
time.Sleep(time.Second * 30)
2018-12-15 00:48:27 +00:00
}
}()
}
2018-12-15 11:15:48 +00:00
// Add a callback for a metadata lookup
2018-12-15 10:39:31 +00:00
func (m *metadata) addCallback(sender boxPubKey, call func(meta *metadataPayload)) {
m.callbacksMutex.Lock()
defer m.callbacksMutex.Unlock()
m.callbacks[sender] = metadataCallback{
created: time.Now(),
call: call,
}
}
2018-12-15 00:48:27 +00:00
// Handles the callback, if there is one
func (m *metadata) callback(sender boxPubKey, meta metadataPayload) {
2018-12-15 10:39:31 +00:00
m.callbacksMutex.Lock()
defer m.callbacksMutex.Unlock()
2018-12-15 00:48:27 +00:00
if callback, ok := m.callbacks[sender]; ok {
callback.call(&meta)
delete(m.callbacks, sender)
}
}
2018-12-15 11:15:48 +00:00
// Get the current node's metadata
2018-12-15 00:48:27 +00:00
func (m *metadata) getMetadata() metadataPayload {
m.myMetadataMutex.RLock()
defer m.myMetadataMutex.RUnlock()
return m.myMetadata
}
2018-12-15 11:15:48 +00:00
// Set the current node's metadata
2018-12-15 11:38:51 +00:00
func (m *metadata) setMetadata(given interface{}) error {
2018-12-15 00:48:27 +00:00
m.myMetadataMutex.Lock()
defer m.myMetadataMutex.Unlock()
2018-12-15 11:38:51 +00:00
newmeta := map[string]interface{}{
"buildname": GetBuildName(),
"buildversion": GetBuildVersion(),
"buildplatform": runtime.GOOS,
"buildarch": runtime.GOARCH,
}
if metamap, ok := given.(map[string]interface{}); ok {
for key, value := range metamap {
if _, ok := newmeta[key]; ok {
continue
}
newmeta[key] = value
}
}
if newjson, err := json.Marshal(newmeta); err == nil {
m.myMetadata = newjson
return nil
} else {
return err
}
2018-12-15 00:48:27 +00:00
}
2018-12-15 10:39:31 +00:00
// Add metadata into the cache for a node
func (m *metadata) addCachedMetadata(key boxPubKey, payload metadataPayload) {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
2018-12-15 11:15:48 +00:00
m.cache[key] = metadataCached{
created: time.Now(),
payload: payload,
}
2018-12-15 10:39:31 +00:00
}
// Get a metadata entry from the cache
2018-12-15 11:15:48 +00:00
func (m *metadata) getCachedMetadata(key boxPubKey) (metadataPayload, error) {
2018-12-15 10:39:31 +00:00
m.cacheMutex.RLock()
defer m.cacheMutex.RUnlock()
if meta, ok := m.cache[key]; ok {
2018-12-15 11:15:48 +00:00
return meta.payload, nil
2018-12-15 10:39:31 +00:00
}
2018-12-15 11:15:48 +00:00
return metadataPayload{}, errors.New("No cache entry found")
2018-12-15 10:39:31 +00:00
}
2018-12-15 11:15:48 +00:00
// Handles a meta request/response - called from the router
2018-12-15 00:48:27 +00:00
func (m *metadata) handleMetadata(meta *sessionMeta) {
if meta.IsResponse {
2018-12-15 10:39:31 +00:00
m.callback(meta.SendPermPub, meta.Metadata)
m.addCachedMetadata(meta.SendPermPub, meta.Metadata)
2018-12-15 00:48:27 +00:00
} else {
m.sendMetadata(meta.SendPermPub, meta.SendCoords, true)
}
}
2018-12-15 11:15:48 +00:00
// Send metadata request or response - called from the router
2018-12-15 00:48:27 +00:00
func (m *metadata) sendMetadata(key boxPubKey, coords []byte, isResponse bool) {
table := m.core.switchTable.table.Load().(lookupTable)
meta := sessionMeta{
SendCoords: table.self.getCoords(),
IsResponse: isResponse,
Metadata: m.core.metadata.getMetadata(),
}
bs := meta.encode()
shared := m.core.sessions.getSharedKey(&m.core.boxPriv, &key)
payload, nonce := boxSeal(shared, bs, nil)
p := wire_protoTrafficPacket{
Coords: coords,
ToKey: key,
FromKey: m.core.boxPub,
Nonce: *nonce,
Payload: payload,
}
packet := p.encode()
m.core.router.out(packet)
}