mirror of
https://github.com/cwinfo/matterbridge.git
synced 2025-07-03 08:27:44 +00:00
Refactor and update RocketChat bridge
* Add support for editing/deleting messages * Add support for uploading files * Add support for avatars * Use the Rocket.Chat.Go.SDK * Use the rest and streaming api
This commit is contained in:
24
vendor/github.com/gopackage/ddp/.gitignore
generated
vendored
Normal file
24
vendor/github.com/gopackage/ddp/.gitignore
generated
vendored
Normal file
@ -0,0 +1,24 @@
|
||||
# Compiled Object files, Static and Dynamic libs (Shared Objects)
|
||||
*.o
|
||||
*.a
|
||||
*.so
|
||||
|
||||
# Folders
|
||||
_obj
|
||||
_test
|
||||
|
||||
# Architecture specific extensions/prefixes
|
||||
*.[568vq]
|
||||
[568vq].out
|
||||
|
||||
*.cgo1.go
|
||||
*.cgo2.c
|
||||
_cgo_defun.c
|
||||
_cgo_gotypes.go
|
||||
_cgo_export.*
|
||||
|
||||
_testmain.go
|
||||
|
||||
*.exe
|
||||
*.test
|
||||
*.prof
|
13
vendor/github.com/gopackage/ddp/LICENSE
generated
vendored
Normal file
13
vendor/github.com/gopackage/ddp/LICENSE
generated
vendored
Normal file
@ -0,0 +1,13 @@
|
||||
Copyright (c) 2015, Metamech LLC.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
3
vendor/github.com/gopackage/ddp/README.md
generated
vendored
Normal file
3
vendor/github.com/gopackage/ddp/README.md
generated
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
# ddp
|
||||
|
||||
MeteorJS DDP library for Golang
|
79
vendor/github.com/gopackage/ddp/ddp.go
generated
vendored
Normal file
79
vendor/github.com/gopackage/ddp/ddp.go
generated
vendored
Normal file
@ -0,0 +1,79 @@
|
||||
// Package ddp implements the MeteorJS DDP protocol over websockets. Fallback
|
||||
// to longpolling is NOT supported (and is not planned on ever being supported
|
||||
// by this library). We will try to model the library after `net/http` - right
|
||||
// now the library is barebones and doesn't provide the pluggability of http.
|
||||
// However, that's the goal for the package eventually.
|
||||
package ddp
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// debugLog is true if we should log debugging information about the connection
|
||||
var debugLog = true
|
||||
|
||||
// The main file contains common utility types.
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
// idManager provides simple incrementing IDs for ddp messages.
|
||||
type idManager struct {
|
||||
// nextID is the next ID for API calls
|
||||
nextID uint64
|
||||
// idMutex is a mutex to protect ID updates
|
||||
idMutex *sync.Mutex
|
||||
}
|
||||
|
||||
// newidManager creates a new instance and sets up resources.
|
||||
func newidManager() *idManager {
|
||||
return &idManager{idMutex: new(sync.Mutex)}
|
||||
}
|
||||
|
||||
// newID issues a new ID for use in calls.
|
||||
func (id *idManager) newID() string {
|
||||
id.idMutex.Lock()
|
||||
next := id.nextID
|
||||
id.nextID++
|
||||
id.idMutex.Unlock()
|
||||
return fmt.Sprintf("%x", next)
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
// pingTracker tracks in-flight pings.
|
||||
type pingTracker struct {
|
||||
handler func(error)
|
||||
timeout time.Duration
|
||||
timer *time.Timer
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
// Call represents an active RPC call.
|
||||
type Call struct {
|
||||
ID string // The uuid for this method call
|
||||
ServiceMethod string // The name of the service and method to call.
|
||||
Args interface{} // The argument to the function (*struct).
|
||||
Reply interface{} // The reply from the function (*struct).
|
||||
Error error // After completion, the error status.
|
||||
Done chan *Call // Strobes when call is complete.
|
||||
Owner *Client // Client that owns the method call
|
||||
}
|
||||
|
||||
// done removes the call from any owners and strobes the done channel with itself.
|
||||
func (call *Call) done() {
|
||||
delete(call.Owner.calls, call.ID)
|
||||
select {
|
||||
case call.Done <- call:
|
||||
// ok
|
||||
default:
|
||||
// We don't want to block here. It is the caller's responsibility to make
|
||||
// sure the channel has enough buffer space. See comment in Go().
|
||||
if debugLog {
|
||||
log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
|
||||
}
|
||||
}
|
||||
}
|
654
vendor/github.com/gopackage/ddp/ddp_client.go
generated
vendored
Normal file
654
vendor/github.com/gopackage/ddp/ddp_client.go
generated
vendored
Normal file
@ -0,0 +1,654 @@
|
||||
package ddp
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/websocket"
|
||||
"errors"
|
||||
)
|
||||
|
||||
const (
|
||||
DISCONNECTED = iota
|
||||
DIALING
|
||||
CONNECTING
|
||||
CONNECTED
|
||||
)
|
||||
|
||||
type ConnectionListener interface {
|
||||
Connected()
|
||||
}
|
||||
|
||||
type ConnectionNotifier interface {
|
||||
AddConnectionListener(listener ConnectionListener)
|
||||
}
|
||||
|
||||
type StatusListener interface {
|
||||
Status(status int)
|
||||
}
|
||||
|
||||
type StatusNotifier interface {
|
||||
AddStatusListener(listener StatusListener)
|
||||
}
|
||||
|
||||
// Client represents a DDP client connection. The DDP client establish a DDP
|
||||
// session and acts as a message pump for other tools.
|
||||
type Client struct {
|
||||
// HeartbeatInterval is the time between heartbeats to send
|
||||
HeartbeatInterval time.Duration
|
||||
// HeartbeatTimeout is the time for a heartbeat ping to timeout
|
||||
HeartbeatTimeout time.Duration
|
||||
// ReconnectInterval is the time between reconnections on bad connections
|
||||
ReconnectInterval time.Duration
|
||||
|
||||
// writeStats controls statistics gathering for current websocket writes.
|
||||
writeSocketStats *WriterStats
|
||||
// writeStats controls statistics gathering for overall client writes.
|
||||
writeStats *WriterStats
|
||||
// writeLog controls logging for client writes.
|
||||
writeLog *WriterLogger
|
||||
// readStats controls statistics gathering for current websocket reads.
|
||||
readSocketStats *ReaderStats
|
||||
// readStats controls statistics gathering for overall client reads.
|
||||
readStats *ReaderStats
|
||||
// readLog control logging for clietn reads.
|
||||
readLog *ReaderLogger
|
||||
// reconnects in the number of reconnections the client has made
|
||||
reconnects int64
|
||||
// pingsIn is the number of pings received from the server
|
||||
pingsIn int64
|
||||
// pingsOut is te number of pings sent by the client
|
||||
pingsOut int64
|
||||
|
||||
// session contains the DDP session token (can be used for reconnects and debugging).
|
||||
session string
|
||||
// version contains the negotiated DDP protocol version in use.
|
||||
version string
|
||||
// serverID the cluster node ID for the server we connected to
|
||||
serverID string
|
||||
// ws is the underlying websocket being used.
|
||||
ws *websocket.Conn
|
||||
// encoder is a JSON encoder to send outgoing packets to the websocket.
|
||||
encoder *json.Encoder
|
||||
// url the URL the websocket is connected to
|
||||
url string
|
||||
// origin is the origin for the websocket connection
|
||||
origin string
|
||||
// inbox is an incoming message channel
|
||||
inbox chan map[string]interface{}
|
||||
// errors is an incoming errors channel
|
||||
errors chan error
|
||||
// pingTimer is a timer for sending regular pings to the server
|
||||
pingTimer *time.Timer
|
||||
// pings tracks inflight pings based on each ping ID.
|
||||
pings map[string][]*pingTracker
|
||||
// calls tracks method invocations that are still in flight
|
||||
calls map[string]*Call
|
||||
// subs tracks active subscriptions. Map contains name->args
|
||||
subs map[string]*Call
|
||||
// collections contains all the collections currently subscribed
|
||||
collections map[string]Collection
|
||||
// connectionStatus is the current connection status of the client
|
||||
connectionStatus int
|
||||
// reconnectTimer is the timer tracking reconnections
|
||||
reconnectTimer *time.Timer
|
||||
// reconnectLock protects access to reconnection
|
||||
reconnectLock *sync.Mutex
|
||||
|
||||
// statusListeners will be informed when the connection status of the client changes
|
||||
statusListeners []StatusListener
|
||||
// connectionListeners will be informed when a connection to the server is established
|
||||
connectionListeners []ConnectionListener
|
||||
|
||||
// idManager tracks IDs for ddp messages
|
||||
idManager
|
||||
}
|
||||
|
||||
// NewClient creates a default client (using an internal websocket) to the
|
||||
// provided URL using the origin for the connection. The client will
|
||||
// automatically connect, upgrade to a websocket, and establish a DDP
|
||||
// connection session before returning the client. The client will
|
||||
// automatically and internally handle heartbeats and reconnects.
|
||||
//
|
||||
// TBD create an option to use an external websocket (aka htt.Transport)
|
||||
// TBD create an option to substitute heartbeat and reconnect behavior (aka http.Tranport)
|
||||
// TBD create an option to hijack the connection (aka http.Hijacker)
|
||||
// TBD create profiling features (aka net/http/pprof)
|
||||
func NewClient(url, origin string) *Client {
|
||||
c := &Client{
|
||||
HeartbeatInterval: time.Minute, // Meteor impl default + 10 (we ping last)
|
||||
HeartbeatTimeout: 15 * time.Second, // Meteor impl default
|
||||
ReconnectInterval: 5 * time.Second,
|
||||
collections: map[string]Collection{},
|
||||
url: url,
|
||||
origin: origin,
|
||||
inbox: make(chan map[string]interface{}, 100),
|
||||
errors: make(chan error, 100),
|
||||
pings: map[string][]*pingTracker{},
|
||||
calls: map[string]*Call{},
|
||||
subs: map[string]*Call{},
|
||||
connectionStatus: DISCONNECTED,
|
||||
reconnectLock: &sync.Mutex{},
|
||||
|
||||
// Stats
|
||||
writeSocketStats: NewWriterStats(nil),
|
||||
writeStats: NewWriterStats(nil),
|
||||
readSocketStats: NewReaderStats(nil),
|
||||
readStats: NewReaderStats(nil),
|
||||
|
||||
// Loggers
|
||||
writeLog: NewWriterTextLogger(nil),
|
||||
readLog: NewReaderTextLogger(nil),
|
||||
|
||||
idManager: *newidManager(),
|
||||
}
|
||||
c.encoder = json.NewEncoder(c.writeStats)
|
||||
c.SetSocketLogActive(false)
|
||||
|
||||
// We spin off an inbox processing goroutine
|
||||
go c.inboxManager()
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
// Session returns the negotiated session token for the connection.
|
||||
func (c *Client) Session() string {
|
||||
return c.session
|
||||
}
|
||||
|
||||
// Version returns the negotiated protocol version in use by the client.
|
||||
func (c *Client) Version() string {
|
||||
return c.version
|
||||
}
|
||||
|
||||
// AddStatusListener in order to receive status change updates.
|
||||
func (c *Client) AddStatusListener(listener StatusListener) {
|
||||
c.statusListeners = append(c.statusListeners, listener)
|
||||
}
|
||||
|
||||
// AddConnectionListener in order to receive connection updates.
|
||||
func (c *Client) AddConnectionListener(listener ConnectionListener) {
|
||||
c.connectionListeners = append(c.connectionListeners, listener)
|
||||
}
|
||||
|
||||
// status updates all status listeners with the new client status.
|
||||
func (c *Client) status(status int) {
|
||||
if c.connectionStatus == status {
|
||||
return
|
||||
}
|
||||
c.connectionStatus = status
|
||||
for _, listener := range c.statusListeners {
|
||||
listener.Status(status)
|
||||
}
|
||||
}
|
||||
|
||||
// Connect attempts to connect the client to the server.
|
||||
func (c *Client) Connect() error {
|
||||
c.status(DIALING)
|
||||
ws, err := websocket.Dial(c.url, "", c.origin)
|
||||
if err != nil {
|
||||
c.Close()
|
||||
log.Println("Dial error", err)
|
||||
c.reconnectLater()
|
||||
return err
|
||||
}
|
||||
// Start DDP connection
|
||||
c.start(ws, NewConnect())
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reconnect attempts to reconnect the client to the server on the existing
|
||||
// DDP session.
|
||||
//
|
||||
// TODO needs a reconnect backoff so we don't trash a down server
|
||||
// TODO reconnect should not allow more reconnects while a reconnection is already in progress.
|
||||
func (c *Client) Reconnect() {
|
||||
func() {
|
||||
c.reconnectLock.Lock()
|
||||
defer c.reconnectLock.Unlock()
|
||||
if c.reconnectTimer != nil {
|
||||
c.reconnectTimer.Stop()
|
||||
c.reconnectTimer = nil
|
||||
}
|
||||
}()
|
||||
|
||||
c.Close()
|
||||
|
||||
c.reconnects++
|
||||
|
||||
// Reconnect
|
||||
c.status(DIALING)
|
||||
ws, err := websocket.Dial(c.url, "", c.origin)
|
||||
if err != nil {
|
||||
c.Close()
|
||||
log.Println("Dial error", err)
|
||||
c.reconnectLater()
|
||||
return
|
||||
}
|
||||
|
||||
c.start(ws, NewReconnect(c.session))
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
// We resume inflight or ongoing subscriptions - we don't have to wait
|
||||
// for connection confirmation (messages can be pipelined).
|
||||
// --------------------------------------------------------------------
|
||||
|
||||
// Send calls that haven't been confirmed - may not have been sent
|
||||
// and effects should be idempotent
|
||||
for _, call := range c.calls {
|
||||
c.Send(NewMethod(call.ID, call.ServiceMethod, call.Args.([]interface{})))
|
||||
}
|
||||
|
||||
// Resend subscriptions and patch up collections
|
||||
for _, sub := range c.subs {
|
||||
c.Send(NewSub(sub.ID, sub.ServiceMethod, sub.Args.([]interface{})))
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe subscribes to data updates.
|
||||
func (c *Client) Subscribe(subName string, done chan *Call, args ...interface{}) *Call {
|
||||
|
||||
if args == nil {
|
||||
args = []interface{}{}
|
||||
}
|
||||
call := new(Call)
|
||||
call.ID = c.newID()
|
||||
call.ServiceMethod = subName
|
||||
call.Args = args
|
||||
call.Owner = c
|
||||
|
||||
if done == nil {
|
||||
done = make(chan *Call, 10) // buffered.
|
||||
} else {
|
||||
// If caller passes done != nil, it must arrange that
|
||||
// done has enough buffer for the number of simultaneous
|
||||
// RPCs that will be using that channel. If the channel
|
||||
// is totally unbuffered, it's best not to run at all.
|
||||
if cap(done) == 0 {
|
||||
log.Panic("ddp.rpc: done channel is unbuffered")
|
||||
}
|
||||
}
|
||||
call.Done = done
|
||||
c.subs[call.ID] = call
|
||||
|
||||
// Save this subscription to the client so we can reconnect
|
||||
subArgs := make([]interface{}, len(args))
|
||||
copy(subArgs, args)
|
||||
|
||||
c.Send(NewSub(call.ID, subName, args))
|
||||
|
||||
return call
|
||||
}
|
||||
|
||||
// Sub sends a synchronous subscription request to the server.
|
||||
func (c *Client) Sub(subName string, args ...interface{}) error {
|
||||
call := <-c.Subscribe(subName, make(chan *Call, 1), args...).Done
|
||||
return call.Error
|
||||
}
|
||||
|
||||
// Go invokes the function asynchronously. It returns the Call structure representing
|
||||
// the invocation. The done channel will signal when the call is complete by returning
|
||||
// the same Call object. If done is nil, Go will allocate a new channel.
|
||||
// If non-nil, done must be buffered or Go will deliberately crash.
|
||||
//
|
||||
// Go and Call are modeled after the standard `net/rpc` package versions.
|
||||
func (c *Client) Go(serviceMethod string, done chan *Call, args ...interface{}) *Call {
|
||||
|
||||
if args == nil {
|
||||
args = []interface{}{}
|
||||
}
|
||||
call := new(Call)
|
||||
call.ID = c.newID()
|
||||
call.ServiceMethod = serviceMethod
|
||||
call.Args = args
|
||||
call.Owner = c
|
||||
if done == nil {
|
||||
done = make(chan *Call, 10) // buffered.
|
||||
} else {
|
||||
// If caller passes done != nil, it must arrange that
|
||||
// done has enough buffer for the number of simultaneous
|
||||
// RPCs that will be using that channel. If the channel
|
||||
// is totally unbuffered, it's best not to run at all.
|
||||
if cap(done) == 0 {
|
||||
log.Panic("ddp.rpc: done channel is unbuffered")
|
||||
}
|
||||
}
|
||||
call.Done = done
|
||||
c.calls[call.ID] = call
|
||||
|
||||
c.Send(NewMethod(call.ID, serviceMethod, args))
|
||||
|
||||
return call
|
||||
}
|
||||
|
||||
// Call invokes the named function, waits for it to complete, and returns its error status.
|
||||
func (c *Client) Call(serviceMethod string, args ...interface{}) (interface{}, error) {
|
||||
call := <-c.Go(serviceMethod, make(chan *Call, 1), args...).Done
|
||||
return call.Reply, call.Error
|
||||
}
|
||||
|
||||
// Ping sends a heartbeat signal to the server. The Ping doesn't look for
|
||||
// a response but may trigger the connection to reconnect if the ping timesout.
|
||||
// This is primarily useful for reviving an unresponsive Client connection.
|
||||
func (c *Client) Ping() {
|
||||
c.PingPong(c.newID(), c.HeartbeatTimeout, func(err error) {
|
||||
if err != nil {
|
||||
// Is there anything else we should or can do?
|
||||
c.reconnectLater()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// PingPong sends a heartbeat signal to the server and calls the provided
|
||||
// function when a pong is received. An optional id can be sent to help
|
||||
// track the responses - or an empty string can be used. It is the
|
||||
// responsibility of the caller to respond to any errors that may occur.
|
||||
func (c *Client) PingPong(id string, timeout time.Duration, handler func(error)) {
|
||||
err := c.Send(NewPing(id))
|
||||
if err != nil {
|
||||
handler(err)
|
||||
return
|
||||
}
|
||||
c.pingsOut++
|
||||
pings, ok := c.pings[id]
|
||||
if !ok {
|
||||
pings = make([]*pingTracker, 0, 5)
|
||||
}
|
||||
tracker := &pingTracker{handler: handler, timeout: timeout, timer: time.AfterFunc(timeout, func() {
|
||||
handler(fmt.Errorf("ping timeout"))
|
||||
})}
|
||||
c.pings[id] = append(pings, tracker)
|
||||
}
|
||||
|
||||
// Send transmits messages to the server. The msg parameter must be json
|
||||
// encoder compatible.
|
||||
func (c *Client) Send(msg interface{}) error {
|
||||
return c.encoder.Encode(msg)
|
||||
}
|
||||
|
||||
// Close implements the io.Closer interface.
|
||||
func (c *Client) Close() {
|
||||
// Shutdown out all outstanding pings
|
||||
if c.pingTimer != nil {
|
||||
c.pingTimer.Stop()
|
||||
c.pingTimer = nil
|
||||
}
|
||||
|
||||
// Close websocket
|
||||
if c.ws != nil {
|
||||
c.ws.Close()
|
||||
c.ws = nil
|
||||
}
|
||||
for _, collection := range c.collections {
|
||||
collection.reset()
|
||||
}
|
||||
c.status(DISCONNECTED)
|
||||
}
|
||||
|
||||
// ResetStats resets the statistics for the client.
|
||||
func (c *Client) ResetStats() {
|
||||
c.readSocketStats.Reset()
|
||||
c.readStats.Reset()
|
||||
c.writeSocketStats.Reset()
|
||||
c.writeStats.Reset()
|
||||
c.reconnects = 0
|
||||
c.pingsIn = 0
|
||||
c.pingsOut = 0
|
||||
}
|
||||
|
||||
// Stats returns the read and write statistics of the client.
|
||||
func (c *Client) Stats() *ClientStats {
|
||||
return &ClientStats{
|
||||
Reads: c.readSocketStats.Snapshot(),
|
||||
TotalReads: c.readStats.Snapshot(),
|
||||
Writes: c.writeSocketStats.Snapshot(),
|
||||
TotalWrites: c.writeStats.Snapshot(),
|
||||
Reconnects: c.reconnects,
|
||||
PingsSent: c.pingsOut,
|
||||
PingsRecv: c.pingsIn,
|
||||
}
|
||||
}
|
||||
|
||||
// SocketLogActive returns the current logging status for the socket.
|
||||
func (c *Client) SocketLogActive() bool {
|
||||
return c.writeLog.Active
|
||||
}
|
||||
|
||||
// SetSocketLogActive to true to enable logging of raw socket data.
|
||||
func (c *Client) SetSocketLogActive(active bool) {
|
||||
c.writeLog.Active = active
|
||||
c.readLog.Active = active
|
||||
}
|
||||
|
||||
// CollectionByName retrieves a collection by it's name.
|
||||
func (c *Client) CollectionByName(name string) Collection {
|
||||
collection, ok := c.collections[name]
|
||||
if !ok {
|
||||
collection = NewCollection(name)
|
||||
c.collections[name] = collection
|
||||
}
|
||||
return collection
|
||||
}
|
||||
|
||||
// CollectionStats returns a snapshot of statistics for the currently known collections.
|
||||
func (c *Client) CollectionStats() []CollectionStats {
|
||||
stats := make([]CollectionStats, 0, len(c.collections))
|
||||
for name, collection := range c.collections {
|
||||
stats = append(stats, CollectionStats{Name: name, Count: len(collection.FindAll())})
|
||||
}
|
||||
return stats
|
||||
}
|
||||
|
||||
// start starts a new client connection on the provided websocket
|
||||
func (c *Client) start(ws *websocket.Conn, connect *Connect) {
|
||||
|
||||
c.status(CONNECTING)
|
||||
|
||||
c.ws = ws
|
||||
c.writeLog.SetWriter(ws)
|
||||
c.writeSocketStats = NewWriterStats(c.writeLog)
|
||||
c.writeStats.SetWriter(c.writeSocketStats)
|
||||
c.readLog.SetReader(ws)
|
||||
c.readSocketStats = NewReaderStats(c.readLog)
|
||||
c.readStats.SetReader(c.readSocketStats)
|
||||
|
||||
// We spin off an inbox stuffing goroutine
|
||||
go c.inboxWorker(c.readStats)
|
||||
|
||||
c.Send(connect)
|
||||
}
|
||||
|
||||
// inboxManager pulls messages from the inbox and routes them to appropriate
|
||||
// handlers.
|
||||
func (c *Client) inboxManager() {
|
||||
for {
|
||||
select {
|
||||
case msg := <-c.inbox:
|
||||
// Message!
|
||||
//log.Println("Got message", msg)
|
||||
mtype, ok := msg["msg"]
|
||||
if ok {
|
||||
switch mtype.(string) {
|
||||
// Connection management
|
||||
case "connected":
|
||||
c.status(CONNECTED)
|
||||
for _, collection := range c.collections {
|
||||
collection.init()
|
||||
}
|
||||
c.version = "1" // Currently the only version we support
|
||||
c.session = msg["session"].(string)
|
||||
// Start automatic heartbeats
|
||||
c.pingTimer = time.AfterFunc(c.HeartbeatInterval, func() {
|
||||
c.Ping()
|
||||
c.pingTimer.Reset(c.HeartbeatInterval)
|
||||
})
|
||||
// Notify connection listeners
|
||||
for _, listener := range c.connectionListeners {
|
||||
go listener.Connected()
|
||||
}
|
||||
case "failed":
|
||||
log.Fatalf("IM Failed to connect, we support version 1 but server supports %s", msg["version"])
|
||||
|
||||
// Heartbeats
|
||||
case "ping":
|
||||
// We received a ping - need to respond with a pong
|
||||
id, ok := msg["id"]
|
||||
if ok {
|
||||
c.Send(NewPong(id.(string)))
|
||||
} else {
|
||||
c.Send(NewPong(""))
|
||||
}
|
||||
c.pingsIn++
|
||||
case "pong":
|
||||
// We received a pong - we can clear the ping tracker and call its handler
|
||||
id, ok := msg["id"]
|
||||
var key string
|
||||
if ok {
|
||||
key = id.(string)
|
||||
}
|
||||
pings, ok := c.pings[key]
|
||||
if ok && len(pings) > 0 {
|
||||
ping := pings[0]
|
||||
pings = pings[1:]
|
||||
if len(key) == 0 || len(pings) > 0 {
|
||||
c.pings[key] = pings
|
||||
}
|
||||
ping.timer.Stop()
|
||||
ping.handler(nil)
|
||||
}
|
||||
|
||||
// Live Data
|
||||
case "nosub":
|
||||
log.Println("Subscription returned a nosub error", msg)
|
||||
// Clear related subscriptions
|
||||
sub, ok := msg["id"]
|
||||
if ok {
|
||||
id := sub.(string)
|
||||
runningSub := c.subs[id]
|
||||
|
||||
if runningSub != nil {
|
||||
runningSub.Error = errors.New("Subscription returned a nosub error")
|
||||
runningSub.done()
|
||||
delete(c.subs, id)
|
||||
}
|
||||
}
|
||||
case "ready":
|
||||
// Run 'done' callbacks on all ready subscriptions
|
||||
subs, ok := msg["subs"]
|
||||
if ok {
|
||||
for _, sub := range subs.([]interface{}) {
|
||||
call, ok := c.subs[sub.(string)]
|
||||
if ok {
|
||||
call.done()
|
||||
}
|
||||
}
|
||||
}
|
||||
case "added":
|
||||
c.collectionBy(msg).added(msg)
|
||||
case "changed":
|
||||
c.collectionBy(msg).changed(msg)
|
||||
case "removed":
|
||||
c.collectionBy(msg).removed(msg)
|
||||
case "addedBefore":
|
||||
c.collectionBy(msg).addedBefore(msg)
|
||||
case "movedBefore":
|
||||
c.collectionBy(msg).movedBefore(msg)
|
||||
|
||||
// RPC
|
||||
case "result":
|
||||
id, ok := msg["id"]
|
||||
if ok {
|
||||
call := c.calls[id.(string)]
|
||||
delete(c.calls, id.(string))
|
||||
e, ok := msg["error"]
|
||||
if ok {
|
||||
txt, _ := json.Marshal(e)
|
||||
call.Error = fmt.Errorf(string(txt))
|
||||
call.Reply = e
|
||||
} else {
|
||||
call.Reply = msg["result"]
|
||||
}
|
||||
call.done()
|
||||
}
|
||||
case "updated":
|
||||
// We currently don't do anything with updated status
|
||||
|
||||
default:
|
||||
// Ignore?
|
||||
log.Println("Server sent unexpected message", msg)
|
||||
}
|
||||
} else {
|
||||
// Current Meteor server sends an undocumented DDP message
|
||||
// (looks like clustering "hint"). We will register and
|
||||
// ignore rather than log an error.
|
||||
serverID, ok := msg["server_id"]
|
||||
if ok {
|
||||
switch ID := serverID.(type) {
|
||||
case string:
|
||||
c.serverID = ID
|
||||
default:
|
||||
log.Println("Server cluster node", serverID)
|
||||
}
|
||||
} else {
|
||||
log.Println("Server sent message with no `msg` field", msg)
|
||||
}
|
||||
}
|
||||
case err := <-c.errors:
|
||||
log.Println("Websocket error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) collectionBy(msg map[string]interface{}) Collection {
|
||||
n, ok := msg["collection"]
|
||||
if !ok {
|
||||
return NewMockCollection()
|
||||
}
|
||||
switch name := n.(type) {
|
||||
case string:
|
||||
return c.CollectionByName(name)
|
||||
default:
|
||||
return NewMockCollection()
|
||||
}
|
||||
}
|
||||
|
||||
// inboxWorker pulls messages from a websocket, decodes JSON packets, and
|
||||
// stuffs them into a message channel.
|
||||
func (c *Client) inboxWorker(ws io.Reader) {
|
||||
dec := json.NewDecoder(ws)
|
||||
for {
|
||||
var event interface{}
|
||||
|
||||
if err := dec.Decode(&event); err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
c.errors <- err
|
||||
}
|
||||
if c.pingTimer != nil {
|
||||
c.pingTimer.Reset(c.HeartbeatInterval)
|
||||
}
|
||||
if event == nil {
|
||||
log.Println("Inbox worker found nil event. May be due to broken websocket. Reconnecting.")
|
||||
break
|
||||
} else {
|
||||
c.inbox <- event.(map[string]interface{})
|
||||
}
|
||||
}
|
||||
|
||||
c.reconnectLater()
|
||||
}
|
||||
|
||||
// reconnectLater schedules a reconnect for later. We need to make sure that we don't
|
||||
// block, and that we don't reconnect more frequently than once every c.ReconnectInterval
|
||||
func (c *Client) reconnectLater() {
|
||||
c.Close()
|
||||
c.reconnectLock.Lock()
|
||||
defer c.reconnectLock.Unlock()
|
||||
if c.reconnectTimer == nil {
|
||||
c.reconnectTimer = time.AfterFunc(c.ReconnectInterval, c.Reconnect)
|
||||
}
|
||||
}
|
245
vendor/github.com/gopackage/ddp/ddp_collection.go
generated
vendored
Normal file
245
vendor/github.com/gopackage/ddp/ddp_collection.go
generated
vendored
Normal file
@ -0,0 +1,245 @@
|
||||
package ddp
|
||||
|
||||
// ----------------------------------------------------------------------
|
||||
// Collection
|
||||
// ----------------------------------------------------------------------
|
||||
|
||||
type Update map[string]interface{}
|
||||
type UpdateListener interface {
|
||||
CollectionUpdate(collection, operation, id string, doc Update)
|
||||
}
|
||||
|
||||
// Collection managed cached collection data sent from the server in a
|
||||
// livedata subscription.
|
||||
//
|
||||
// It would be great to build an entire mongo compatible local store (minimongo)
|
||||
type Collection interface {
|
||||
|
||||
// FindOne queries objects and returns the first match.
|
||||
FindOne(id string) Update
|
||||
// FindAll returns a map of all items in the cache - this is a hack
|
||||
// until we have time to build out a real minimongo interface.
|
||||
FindAll() map[string]Update
|
||||
// AddUpdateListener adds a channel that receives update messages.
|
||||
AddUpdateListener(listener UpdateListener)
|
||||
|
||||
// livedata updates
|
||||
added(msg Update)
|
||||
changed(msg Update)
|
||||
removed(msg Update)
|
||||
addedBefore(msg Update)
|
||||
movedBefore(msg Update)
|
||||
init() // init informs the collection that the connection to the server has begun/resumed
|
||||
reset() // reset informs the collection that the connection to the server has been lost
|
||||
}
|
||||
|
||||
// NewMockCollection creates an empty collection that does nothing.
|
||||
func NewMockCollection() Collection {
|
||||
return &MockCache{}
|
||||
}
|
||||
|
||||
// NewCollection creates a new collection - always KeyCache.
|
||||
func NewCollection(name string) Collection {
|
||||
return &KeyCache{name, make(map[string]Update), nil}
|
||||
}
|
||||
|
||||
// KeyCache caches items keyed on unique ID.
|
||||
type KeyCache struct {
|
||||
// The name of the collection
|
||||
Name string
|
||||
// items contains collection items by ID
|
||||
items map[string]Update
|
||||
// listeners contains all the listeners that should be notified of collection updates.
|
||||
listeners []UpdateListener
|
||||
// TODO(badslug): do we need to protect from multiple threads
|
||||
}
|
||||
|
||||
func (c *KeyCache) added(msg Update) {
|
||||
id, fields := parseUpdate(msg)
|
||||
if fields != nil {
|
||||
c.items[id] = fields
|
||||
c.notify("create", id, fields)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *KeyCache) changed(msg Update) {
|
||||
id, fields := parseUpdate(msg)
|
||||
if fields != nil {
|
||||
item, ok := c.items[id]
|
||||
if ok {
|
||||
for key, value := range fields {
|
||||
item[key] = value
|
||||
}
|
||||
c.items[id] = item
|
||||
c.notify("update", id, item)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *KeyCache) removed(msg Update) {
|
||||
id, _ := parseUpdate(msg)
|
||||
if len(id) > 0 {
|
||||
delete(c.items, id)
|
||||
c.notify("remove", id, nil)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *KeyCache) addedBefore(msg Update) {
|
||||
// for keyed cache, ordered commands are a noop
|
||||
}
|
||||
|
||||
func (c *KeyCache) movedBefore(msg Update) {
|
||||
// for keyed cache, ordered commands are a noop
|
||||
}
|
||||
|
||||
// init prepares the collection for data updates (called when a new connection is
|
||||
// made or a connection/session is resumed).
|
||||
func (c *KeyCache) init() {
|
||||
// TODO start to patch up the current data with fresh server state
|
||||
}
|
||||
|
||||
func (c *KeyCache) reset() {
|
||||
// TODO we should mark the collection but maintain it's contents and then
|
||||
// patch up the current contents with the new contents when we receive them.
|
||||
//c.items = nil
|
||||
c.notify("reset", "", nil)
|
||||
}
|
||||
|
||||
// notify sends a Update to all UpdateListener's which should never block.
|
||||
func (c *KeyCache) notify(operation, id string, doc Update) {
|
||||
for _, listener := range c.listeners {
|
||||
listener.CollectionUpdate(c.Name, operation, id, doc)
|
||||
}
|
||||
}
|
||||
|
||||
// FindOne returns the item with matching id.
|
||||
func (c *KeyCache) FindOne(id string) Update {
|
||||
return c.items[id]
|
||||
}
|
||||
|
||||
// FindAll returns a dump of all items in the collection
|
||||
func (c *KeyCache) FindAll() map[string]Update {
|
||||
return c.items
|
||||
}
|
||||
|
||||
// AddUpdateListener adds a listener for changes on a collection.
|
||||
func (c *KeyCache) AddUpdateListener(listener UpdateListener) {
|
||||
c.listeners = append(c.listeners, listener)
|
||||
}
|
||||
|
||||
// OrderedCache caches items based on list order.
|
||||
// This is a placeholder, currently not implemented as the Meteor server
|
||||
// does not transmit ordered collections over DDP yet.
|
||||
type OrderedCache struct {
|
||||
// ranks contains ordered collection items for ordered collections
|
||||
items []interface{}
|
||||
}
|
||||
|
||||
func (c *OrderedCache) added(msg Update) {
|
||||
// for ordered cache, key commands are a noop
|
||||
}
|
||||
|
||||
func (c *OrderedCache) changed(msg Update) {
|
||||
|
||||
}
|
||||
|
||||
func (c *OrderedCache) removed(msg Update) {
|
||||
|
||||
}
|
||||
|
||||
func (c *OrderedCache) addedBefore(msg Update) {
|
||||
|
||||
}
|
||||
|
||||
func (c *OrderedCache) movedBefore(msg Update) {
|
||||
|
||||
}
|
||||
|
||||
func (c *OrderedCache) init() {
|
||||
|
||||
}
|
||||
|
||||
func (c *OrderedCache) reset() {
|
||||
|
||||
}
|
||||
|
||||
// FindOne returns the item with matching id.
|
||||
func (c *OrderedCache) FindOne(id string) Update {
|
||||
return nil
|
||||
}
|
||||
|
||||
// FindAll returns a dump of all items in the collection
|
||||
func (c *OrderedCache) FindAll() map[string]Update {
|
||||
return map[string]Update{}
|
||||
}
|
||||
|
||||
// AddUpdateListener does nothing.
|
||||
func (c *OrderedCache) AddUpdateListener(ch UpdateListener) {
|
||||
}
|
||||
|
||||
// MockCache implements the Collection interface but does nothing with the data.
|
||||
type MockCache struct {
|
||||
}
|
||||
|
||||
func (c *MockCache) added(msg Update) {
|
||||
|
||||
}
|
||||
|
||||
func (c *MockCache) changed(msg Update) {
|
||||
|
||||
}
|
||||
|
||||
func (c *MockCache) removed(msg Update) {
|
||||
|
||||
}
|
||||
|
||||
func (c *MockCache) addedBefore(msg Update) {
|
||||
|
||||
}
|
||||
|
||||
func (c *MockCache) movedBefore(msg Update) {
|
||||
|
||||
}
|
||||
|
||||
func (c *MockCache) init() {
|
||||
|
||||
}
|
||||
|
||||
func (c *MockCache) reset() {
|
||||
|
||||
}
|
||||
|
||||
// FindOne returns the item with matching id.
|
||||
func (c *MockCache) FindOne(id string) Update {
|
||||
return nil
|
||||
}
|
||||
|
||||
// FindAll returns a dump of all items in the collection
|
||||
func (c *MockCache) FindAll() map[string]Update {
|
||||
return map[string]Update{}
|
||||
}
|
||||
|
||||
// AddUpdateListener does nothing.
|
||||
func (c *MockCache) AddUpdateListener(ch UpdateListener) {
|
||||
}
|
||||
|
||||
// parseUpdate returns the ID and fields from a DDP Update document.
|
||||
func parseUpdate(up Update) (ID string, Fields Update) {
|
||||
key, ok := up["id"]
|
||||
if ok {
|
||||
switch id := key.(type) {
|
||||
case string:
|
||||
updates, ok := up["fields"]
|
||||
if ok {
|
||||
switch fields := updates.(type) {
|
||||
case map[string]interface{}:
|
||||
return id, Update(fields)
|
||||
default:
|
||||
// Don't know what to do...
|
||||
}
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
}
|
||||
return "", nil
|
||||
}
|
217
vendor/github.com/gopackage/ddp/ddp_ejson.go
generated
vendored
Normal file
217
vendor/github.com/gopackage/ddp/ddp_ejson.go
generated
vendored
Normal file
@ -0,0 +1,217 @@
|
||||
package ddp
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ----------------------------------------------------------------------
|
||||
// EJSON document interface
|
||||
// ----------------------------------------------------------------------
|
||||
// https://github.com/meteor/meteor/blob/devel/packages/ddp/DDP.md#appendix-ejson
|
||||
|
||||
// Doc provides hides the complexity of ejson documents.
|
||||
type Doc struct {
|
||||
root interface{}
|
||||
}
|
||||
|
||||
// NewDoc creates a new document from a generic json parsed document.
|
||||
func NewDoc(in interface{}) *Doc {
|
||||
doc := &Doc{in}
|
||||
return doc
|
||||
}
|
||||
|
||||
// Map locates a map[string]interface{} - json object - at a path
|
||||
// or returns nil if not found.
|
||||
func (d *Doc) Map(path string) map[string]interface{} {
|
||||
item := d.Item(path)
|
||||
if item != nil {
|
||||
switch m := item.(type) {
|
||||
case map[string]interface{}:
|
||||
return m
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Array locates an []interface{} - json array - at a path
|
||||
// or returns nil if not found.
|
||||
func (d *Doc) Array(path string) []interface{} {
|
||||
item := d.Item(path)
|
||||
if item != nil {
|
||||
switch m := item.(type) {
|
||||
case []interface{}:
|
||||
return m
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// StringArray locates an []string - json array of strings - at a path
|
||||
// or returns nil if not found. The string array will contain all string values
|
||||
// in the array and skip any non-string entries.
|
||||
func (d *Doc) StringArray(path string) []string {
|
||||
item := d.Item(path)
|
||||
if item != nil {
|
||||
switch m := item.(type) {
|
||||
case []interface{}:
|
||||
items := []string{}
|
||||
for _, item := range m {
|
||||
switch val := item.(type) {
|
||||
case string:
|
||||
items = append(items, val)
|
||||
}
|
||||
}
|
||||
return items
|
||||
case []string:
|
||||
return m
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// String returns a string value located at the path or an empty string if not found.
|
||||
func (d *Doc) String(path string) string {
|
||||
item := d.Item(path)
|
||||
if item != nil {
|
||||
switch m := item.(type) {
|
||||
case string:
|
||||
return m
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// Bool returns a boolean value located at the path or false if not found.
|
||||
func (d *Doc) Bool(path string) bool {
|
||||
item := d.Item(path)
|
||||
if item != nil {
|
||||
switch m := item.(type) {
|
||||
case bool:
|
||||
return m
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Float returns a float64 value located at the path or zero if not found.
|
||||
func (d *Doc) Float(path string) float64 {
|
||||
item := d.Item(path)
|
||||
if item != nil {
|
||||
switch m := item.(type) {
|
||||
case float64:
|
||||
return m
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// Time returns a time value located at the path or nil if not found.
|
||||
func (d *Doc) Time(path string) time.Time {
|
||||
ticks := d.Float(path + ".$date")
|
||||
var t time.Time
|
||||
if ticks > 0 {
|
||||
sec := int64(ticks / 1000)
|
||||
t = time.Unix(int64(sec), 0)
|
||||
}
|
||||
return t
|
||||
}
|
||||
|
||||
// Item locates a "raw" item at the provided path, returning
|
||||
// the item found or nil if not found.
|
||||
func (d *Doc) Item(path string) interface{} {
|
||||
item := d.root
|
||||
steps := strings.Split(path, ".")
|
||||
for _, step := range steps {
|
||||
// This is an intermediate step - we must be in a map
|
||||
switch m := item.(type) {
|
||||
case map[string]interface{}:
|
||||
item = m[step]
|
||||
case Update:
|
||||
item = m[step]
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return item
|
||||
}
|
||||
|
||||
// Set a value for a path. Intermediate items are created as necessary.
|
||||
func (d *Doc) Set(path string, value interface{}) {
|
||||
item := d.root
|
||||
steps := strings.Split(path, ".")
|
||||
last := steps[len(steps)-1]
|
||||
steps = steps[:len(steps)-1]
|
||||
for _, step := range steps {
|
||||
// This is an intermediate step - we must be in a map
|
||||
switch m := item.(type) {
|
||||
case map[string]interface{}:
|
||||
item = m[step]
|
||||
if item == nil {
|
||||
item = map[string]interface{}{}
|
||||
m[step] = item
|
||||
}
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
// Item is now the last map so we just set the value
|
||||
switch m := item.(type) {
|
||||
case map[string]interface{}:
|
||||
m[last] = value
|
||||
}
|
||||
}
|
||||
|
||||
// Accounts password login support
|
||||
type Login struct {
|
||||
User *User `json:"user"`
|
||||
Password *Password `json:"password"`
|
||||
}
|
||||
|
||||
func NewEmailLogin(email, pass string) *Login {
|
||||
return &Login{User: &User{Email: email}, Password: NewPassword(pass)}
|
||||
}
|
||||
|
||||
func NewUsernameLogin(user, pass string) *Login {
|
||||
return &Login{User: &User{Username: user}, Password: NewPassword(pass)}
|
||||
}
|
||||
|
||||
type LoginResume struct {
|
||||
Token string `json:"resume"`
|
||||
}
|
||||
|
||||
func NewLoginResume(token string) *LoginResume {
|
||||
return &LoginResume{Token: token}
|
||||
}
|
||||
|
||||
type User struct {
|
||||
Email string `json:"email,omitempty"`
|
||||
Username string `json:"username,omitempty"`
|
||||
}
|
||||
|
||||
type Password struct {
|
||||
Digest string `json:"digest"`
|
||||
Algorithm string `json:"algorithm"`
|
||||
}
|
||||
|
||||
func NewPassword(pass string) *Password {
|
||||
sha := sha256.New()
|
||||
io.WriteString(sha, pass)
|
||||
digest := sha.Sum(nil)
|
||||
return &Password{Digest: hex.EncodeToString(digest), Algorithm: "sha-256"}
|
||||
}
|
82
vendor/github.com/gopackage/ddp/ddp_messages.go
generated
vendored
Normal file
82
vendor/github.com/gopackage/ddp/ddp_messages.go
generated
vendored
Normal file
@ -0,0 +1,82 @@
|
||||
package ddp
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// DDP Messages
|
||||
//
|
||||
// Go structs representing DDP raw messages ready for JSON
|
||||
// encoding.
|
||||
// ------------------------------------------------------------
|
||||
|
||||
// Message contains the common fields that all DDP messages use.
|
||||
type Message struct {
|
||||
Type string `json:"msg"`
|
||||
ID string `json:"id,omitempty"`
|
||||
}
|
||||
|
||||
// Connect represents a DDP connect message.
|
||||
type Connect struct {
|
||||
Message
|
||||
Version string `json:"version"`
|
||||
Support []string `json:"support"`
|
||||
Session string `json:"session,omitempty"`
|
||||
}
|
||||
|
||||
// NewConnect creates a new connect message
|
||||
func NewConnect() *Connect {
|
||||
return &Connect{Message: Message{Type: "connect"}, Version: "1", Support: []string{"1"}}
|
||||
}
|
||||
|
||||
// NewReconnect creates a new connect message with a session ID to resume.
|
||||
func NewReconnect(session string) *Connect {
|
||||
c := NewConnect()
|
||||
c.Session = session
|
||||
return c
|
||||
}
|
||||
|
||||
// Ping represents a DDP ping message.
|
||||
type Ping Message
|
||||
|
||||
// NewPing creates a new ping message with optional ID.
|
||||
func NewPing(id string) *Ping {
|
||||
return &Ping{Type: "ping", ID: id}
|
||||
}
|
||||
|
||||
// Pong represents a DDP pong message.
|
||||
type Pong Message
|
||||
|
||||
// NewPong creates a new pong message with optional ID.
|
||||
func NewPong(id string) *Pong {
|
||||
return &Pong{Type: "pong", ID: id}
|
||||
}
|
||||
|
||||
// Method is used to send a remote procedure call to the server.
|
||||
type Method struct {
|
||||
Message
|
||||
ServiceMethod string `json:"method"`
|
||||
Args []interface{} `json:"params"`
|
||||
}
|
||||
|
||||
// NewMethod creates a new method invocation object.
|
||||
func NewMethod(id, serviceMethod string, args []interface{}) *Method {
|
||||
return &Method{
|
||||
Message: Message{Type: "method", ID: id},
|
||||
ServiceMethod: serviceMethod,
|
||||
Args: args,
|
||||
}
|
||||
}
|
||||
|
||||
// Sub is used to send a subscription request to the server.
|
||||
type Sub struct {
|
||||
Message
|
||||
SubName string `json:"name"`
|
||||
Args []interface{} `json:"params"`
|
||||
}
|
||||
|
||||
// NewSub creates a new sub object.
|
||||
func NewSub(id, subName string, args []interface{}) *Sub {
|
||||
return &Sub{
|
||||
Message: Message{Type: "sub", ID: id},
|
||||
SubName: subName,
|
||||
Args: args,
|
||||
}
|
||||
}
|
321
vendor/github.com/gopackage/ddp/ddp_stats.go
generated
vendored
Normal file
321
vendor/github.com/gopackage/ddp/ddp_stats.go
generated
vendored
Normal file
@ -0,0 +1,321 @@
|
||||
package ddp
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Gather statistics about a DDP connection.
|
||||
|
||||
// ---------------------------------------------------------
|
||||
// io utilities
|
||||
//
|
||||
// This is generic - should be moved into a stand alone lib
|
||||
// ---------------------------------------------------------
|
||||
|
||||
// ReaderProxy provides common tooling for structs that manage an io.Reader.
|
||||
type ReaderProxy struct {
|
||||
reader io.Reader
|
||||
}
|
||||
|
||||
// NewReaderProxy creates a new proxy for the provided reader.
|
||||
func NewReaderProxy(reader io.Reader) *ReaderProxy {
|
||||
return &ReaderProxy{reader}
|
||||
}
|
||||
|
||||
// SetReader sets the reader on the proxy.
|
||||
func (r *ReaderProxy) SetReader(reader io.Reader) {
|
||||
r.reader = reader
|
||||
}
|
||||
|
||||
// WriterProxy provides common tooling for structs that manage an io.Writer.
|
||||
type WriterProxy struct {
|
||||
writer io.Writer
|
||||
}
|
||||
|
||||
// NewWriterProxy creates a new proxy for the provided writer.
|
||||
func NewWriterProxy(writer io.Writer) *WriterProxy {
|
||||
return &WriterProxy{writer}
|
||||
}
|
||||
|
||||
// SetWriter sets the writer on the proxy.
|
||||
func (w *WriterProxy) SetWriter(writer io.Writer) {
|
||||
w.writer = writer
|
||||
}
|
||||
|
||||
// Logging data types
|
||||
const (
|
||||
DataByte = iota // data is raw []byte
|
||||
DataText // data is string data
|
||||
)
|
||||
|
||||
// Logger logs data from i/o sources.
|
||||
type Logger struct {
|
||||
// Active is true if the logger should be logging reads
|
||||
Active bool
|
||||
// Truncate is >0 to indicate the number of characters to truncate output
|
||||
Truncate int
|
||||
|
||||
logger *log.Logger
|
||||
dtype int
|
||||
}
|
||||
|
||||
// NewLogger creates a new i/o logger.
|
||||
func NewLogger(logger *log.Logger, active bool, dataType int, truncate int) Logger {
|
||||
return Logger{logger: logger, Active: active, dtype: dataType, Truncate: truncate}
|
||||
}
|
||||
|
||||
// Log logs the current i/o operation and returns the read and error for
|
||||
// easy call chaining.
|
||||
func (l *Logger) Log(p []byte, n int, err error) (int, error) {
|
||||
if l.Active && err == nil {
|
||||
limit := n
|
||||
truncated := false
|
||||
if l.Truncate > 0 && l.Truncate < limit {
|
||||
limit = l.Truncate
|
||||
truncated = true
|
||||
}
|
||||
switch l.dtype {
|
||||
case DataText:
|
||||
if truncated {
|
||||
l.logger.Printf("[%d] %s...", n, string(p[:limit]))
|
||||
} else {
|
||||
l.logger.Printf("[%d] %s", n, string(p[:limit]))
|
||||
}
|
||||
case DataByte:
|
||||
fallthrough
|
||||
default:
|
||||
l.logger.Println(hex.Dump(p[:limit]))
|
||||
}
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
// ReaderLogger logs data from any io.Reader.
|
||||
// ReaderLogger wraps a Reader and passes data to the actual data consumer.
|
||||
type ReaderLogger struct {
|
||||
Logger
|
||||
ReaderProxy
|
||||
}
|
||||
|
||||
// NewReaderDataLogger creates an active binary data logger with a default
|
||||
// log.Logger and a '->' prefix.
|
||||
func NewReaderDataLogger(reader io.Reader) *ReaderLogger {
|
||||
logger := log.New(os.Stdout, "<- ", log.LstdFlags)
|
||||
return NewReaderLogger(reader, logger, true, DataByte, 0)
|
||||
}
|
||||
|
||||
// NewReaderTextLogger creates an active binary data logger with a default
|
||||
// log.Logger and a '->' prefix.
|
||||
func NewReaderTextLogger(reader io.Reader) *ReaderLogger {
|
||||
logger := log.New(os.Stdout, "<- ", log.LstdFlags)
|
||||
return NewReaderLogger(reader, logger, true, DataText, 80)
|
||||
}
|
||||
|
||||
// NewReaderLogger creates a Reader logger for the provided parameters.
|
||||
func NewReaderLogger(reader io.Reader, logger *log.Logger, active bool, dataType int, truncate int) *ReaderLogger {
|
||||
return &ReaderLogger{ReaderProxy: *NewReaderProxy(reader), Logger: NewLogger(logger, active, dataType, truncate)}
|
||||
}
|
||||
|
||||
// Read logs the read bytes and passes them to the wrapped reader.
|
||||
func (r *ReaderLogger) Read(p []byte) (int, error) {
|
||||
n, err := r.reader.Read(p)
|
||||
return r.Log(p, n, err)
|
||||
}
|
||||
|
||||
// WriterLogger logs data from any io.Writer.
|
||||
// WriterLogger wraps a Writer and passes data to the actual data producer.
|
||||
type WriterLogger struct {
|
||||
Logger
|
||||
WriterProxy
|
||||
}
|
||||
|
||||
// NewWriterDataLogger creates an active binary data logger with a default
|
||||
// log.Logger and a '->' prefix.
|
||||
func NewWriterDataLogger(writer io.Writer) *WriterLogger {
|
||||
logger := log.New(os.Stdout, "-> ", log.LstdFlags)
|
||||
return NewWriterLogger(writer, logger, true, DataByte, 0)
|
||||
}
|
||||
|
||||
// NewWriterTextLogger creates an active binary data logger with a default
|
||||
// log.Logger and a '->' prefix.
|
||||
func NewWriterTextLogger(writer io.Writer) *WriterLogger {
|
||||
logger := log.New(os.Stdout, "-> ", log.LstdFlags)
|
||||
return NewWriterLogger(writer, logger, true, DataText, 80)
|
||||
}
|
||||
|
||||
// NewWriterLogger creates a Reader logger for the provided parameters.
|
||||
func NewWriterLogger(writer io.Writer, logger *log.Logger, active bool, dataType int, truncate int) *WriterLogger {
|
||||
return &WriterLogger{WriterProxy: *NewWriterProxy(writer), Logger: NewLogger(logger, active, dataType, truncate)}
|
||||
}
|
||||
|
||||
// Write logs the written bytes and passes them to the wrapped writer.
|
||||
func (w *WriterLogger) Write(p []byte) (int, error) {
|
||||
if w.writer != nil {
|
||||
n, err := w.writer.Write(p)
|
||||
return w.Log(p, n, err)
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// Stats tracks statistics for i/o operations. Stats are produced from a
|
||||
// of a running stats agent.
|
||||
type Stats struct {
|
||||
// Bytes is the total number of bytes transferred.
|
||||
Bytes int64
|
||||
// Ops is the total number of i/o operations performed.
|
||||
Ops int64
|
||||
// Errors is the total number of i/o errors encountered.
|
||||
Errors int64
|
||||
// Runtime is the duration that stats have been gathered.
|
||||
Runtime time.Duration
|
||||
}
|
||||
|
||||
// ClientStats displays combined statistics for the Client.
|
||||
type ClientStats struct {
|
||||
// Reads provides statistics on the raw i/o network reads for the current connection.
|
||||
Reads *Stats
|
||||
// Reads provides statistics on the raw i/o network reads for the all client connections.
|
||||
TotalReads *Stats
|
||||
// Writes provides statistics on the raw i/o network writes for the current connection.
|
||||
Writes *Stats
|
||||
// Writes provides statistics on the raw i/o network writes for all the client connections.
|
||||
TotalWrites *Stats
|
||||
// Reconnects is the number of reconnections the client has made.
|
||||
Reconnects int64
|
||||
// PingsSent is the number of pings sent by the client
|
||||
PingsSent int64
|
||||
// PingsRecv is the number of pings received by the client
|
||||
PingsRecv int64
|
||||
}
|
||||
|
||||
// String produces a compact string representation of the client stats.
|
||||
func (stats *ClientStats) String() string {
|
||||
i := stats.Reads
|
||||
ti := stats.TotalReads
|
||||
o := stats.Writes
|
||||
to := stats.TotalWrites
|
||||
totalRun := (ti.Runtime * 1000000) / 1000000
|
||||
run := (i.Runtime * 1000000) / 1000000
|
||||
return fmt.Sprintf("bytes: %d/%d##%d/%d ops: %d/%d##%d/%d err: %d/%d##%d/%d reconnects: %d pings: %d/%d uptime: %v##%v",
|
||||
i.Bytes, o.Bytes,
|
||||
ti.Bytes, to.Bytes,
|
||||
i.Ops, o.Ops,
|
||||
ti.Ops, to.Ops,
|
||||
i.Errors, o.Errors,
|
||||
ti.Errors, to.Errors,
|
||||
stats.Reconnects,
|
||||
stats.PingsRecv, stats.PingsSent,
|
||||
run, totalRun)
|
||||
}
|
||||
|
||||
// CollectionStats combines statistics about a collection.
|
||||
type CollectionStats struct {
|
||||
Name string // Name of the collection
|
||||
Count int // Count is the total number of documents in the collection
|
||||
}
|
||||
|
||||
// String produces a compact string representation of the collection stat.
|
||||
func (s *CollectionStats) String() string {
|
||||
return fmt.Sprintf("%s[%d]", s.Name, s.Count)
|
||||
}
|
||||
|
||||
// StatsTracker provides the basic tooling for tracking i/o stats.
|
||||
type StatsTracker struct {
|
||||
bytes int64
|
||||
ops int64
|
||||
errors int64
|
||||
start time.Time
|
||||
lock *sync.Mutex
|
||||
}
|
||||
|
||||
// NewStatsTracker create a new stats tracker with start time set to now.
|
||||
func NewStatsTracker() *StatsTracker {
|
||||
return &StatsTracker{start: time.Now(), lock: new(sync.Mutex)}
|
||||
}
|
||||
|
||||
// Op records an i/o operation. The parameters are passed through to
|
||||
// allow easy chaining.
|
||||
func (t *StatsTracker) Op(n int, err error) (int, error) {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
t.ops++
|
||||
if err == nil {
|
||||
t.bytes += int64(n)
|
||||
} else {
|
||||
if err == io.EOF {
|
||||
// I don't think we should log EOF stats as an error
|
||||
} else {
|
||||
t.errors++
|
||||
}
|
||||
}
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Snapshot takes a snapshot of the current reader statistics.
|
||||
func (t *StatsTracker) Snapshot() *Stats {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
return t.snap()
|
||||
}
|
||||
|
||||
// Reset sets all of the stats to initial values.
|
||||
func (t *StatsTracker) Reset() *Stats {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
|
||||
stats := t.snap()
|
||||
t.bytes = 0
|
||||
t.ops = 0
|
||||
t.errors = 0
|
||||
t.start = time.Now()
|
||||
|
||||
return stats
|
||||
}
|
||||
|
||||
func (t *StatsTracker) snap() *Stats {
|
||||
return &Stats{Bytes: t.bytes, Ops: t.ops, Errors: t.errors, Runtime: time.Since(t.start)}
|
||||
}
|
||||
|
||||
// ReaderStats tracks statistics on any io.Reader.
|
||||
// ReaderStats wraps a Reader and passes data to the actual data consumer.
|
||||
type ReaderStats struct {
|
||||
StatsTracker
|
||||
ReaderProxy
|
||||
}
|
||||
|
||||
// NewReaderStats creates a ReaderStats object for the provided reader.
|
||||
func NewReaderStats(reader io.Reader) *ReaderStats {
|
||||
return &ReaderStats{ReaderProxy: *NewReaderProxy(reader), StatsTracker: *NewStatsTracker()}
|
||||
}
|
||||
|
||||
// Read passes through a read collecting statistics and logging activity.
|
||||
func (r *ReaderStats) Read(p []byte) (int, error) {
|
||||
return r.Op(r.reader.Read(p))
|
||||
}
|
||||
|
||||
// WriterStats tracks statistics on any io.Writer.
|
||||
// WriterStats wraps a Writer and passes data to the actual data producer.
|
||||
type WriterStats struct {
|
||||
StatsTracker
|
||||
WriterProxy
|
||||
}
|
||||
|
||||
// NewWriterStats creates a WriterStats object for the provided writer.
|
||||
func NewWriterStats(writer io.Writer) *WriterStats {
|
||||
return &WriterStats{WriterProxy: *NewWriterProxy(writer), StatsTracker: *NewStatsTracker()}
|
||||
}
|
||||
|
||||
// Write passes through a write collecting statistics.
|
||||
func (w *WriterStats) Write(p []byte) (int, error) {
|
||||
if w.writer != nil {
|
||||
return w.Op(w.writer.Write(p))
|
||||
}
|
||||
return 0, nil
|
||||
}
|
Reference in New Issue
Block a user