mirror of
https://github.com/cwinfo/matterbridge.git
synced 2024-11-28 19:31:36 +00:00
19d47784bd
Webhooks don't support the threading yet, so this is token only. In discord you can reply on each message of a thread, but this is not possible in mattermost (so some changes added there to make sure we always answer on the rootID of the thread). Also needs some more testing with slack. update : It now also uses the token when replying to a thread (even if webhooks are enabled), until webhooks have support for threads.
192 lines
5.1 KiB
Go
192 lines
5.1 KiB
Go
package gateway
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/42wim/matterbridge/bridge"
|
|
"github.com/42wim/matterbridge/bridge/config"
|
|
"github.com/42wim/matterbridge/gateway/samechannel"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type Router struct {
|
|
config.Config
|
|
sync.RWMutex
|
|
|
|
BridgeMap map[string]bridge.Factory
|
|
Gateways map[string]*Gateway
|
|
Message chan config.Message
|
|
MattermostPlugin chan config.Message
|
|
|
|
logger *logrus.Entry
|
|
}
|
|
|
|
// NewRouter initializes a new Matterbridge router for the specified configuration and
|
|
// sets up all required gateways.
|
|
func NewRouter(rootLogger *logrus.Logger, cfg config.Config, bridgeMap map[string]bridge.Factory) (*Router, error) {
|
|
logger := rootLogger.WithFields(logrus.Fields{"prefix": "router"})
|
|
|
|
r := &Router{
|
|
Config: cfg,
|
|
BridgeMap: bridgeMap,
|
|
Message: make(chan config.Message),
|
|
MattermostPlugin: make(chan config.Message),
|
|
Gateways: make(map[string]*Gateway),
|
|
logger: logger,
|
|
}
|
|
sgw := samechannel.New(cfg)
|
|
gwconfigs := append(sgw.GetConfig(), cfg.BridgeValues().Gateway...)
|
|
|
|
for idx := range gwconfigs {
|
|
entry := &gwconfigs[idx]
|
|
if !entry.Enable {
|
|
continue
|
|
}
|
|
if entry.Name == "" {
|
|
return nil, fmt.Errorf("%s", "Gateway without name found")
|
|
}
|
|
if _, ok := r.Gateways[entry.Name]; ok {
|
|
return nil, fmt.Errorf("Gateway with name %s already exists", entry.Name)
|
|
}
|
|
r.Gateways[entry.Name] = New(rootLogger, entry, r)
|
|
}
|
|
return r, nil
|
|
}
|
|
|
|
// Start will connect all gateways belonging to this router and subsequently route messages
|
|
// between them.
|
|
func (r *Router) Start() error {
|
|
m := make(map[string]*bridge.Bridge)
|
|
if len(r.Gateways) == 0 {
|
|
return fmt.Errorf("no [[gateway]] configured. See https://github.com/42wim/matterbridge/wiki/How-to-create-your-config for more info")
|
|
}
|
|
for _, gw := range r.Gateways {
|
|
r.logger.Infof("Parsing gateway %s", gw.Name)
|
|
if len(gw.Bridges) == 0 {
|
|
return fmt.Errorf("no bridges configured for gateway %s. See https://github.com/42wim/matterbridge/wiki/How-to-create-your-config for more info", gw.Name)
|
|
}
|
|
for _, br := range gw.Bridges {
|
|
m[br.Account] = br
|
|
}
|
|
}
|
|
for _, br := range m {
|
|
r.logger.Infof("Starting bridge: %s ", br.Account)
|
|
err := br.Connect()
|
|
if err != nil {
|
|
e := fmt.Errorf("Bridge %s failed to start: %v", br.Account, err)
|
|
if r.disableBridge(br, e) {
|
|
continue
|
|
}
|
|
return e
|
|
}
|
|
err = br.JoinChannels()
|
|
if err != nil {
|
|
e := fmt.Errorf("Bridge %s failed to join channel: %v", br.Account, err)
|
|
if r.disableBridge(br, e) {
|
|
continue
|
|
}
|
|
return e
|
|
}
|
|
}
|
|
// remove unused bridges
|
|
for _, gw := range r.Gateways {
|
|
for i, br := range gw.Bridges {
|
|
if br.Bridger == nil {
|
|
r.logger.Errorf("removing failed bridge %s", i)
|
|
delete(gw.Bridges, i)
|
|
}
|
|
}
|
|
}
|
|
go r.handleReceive()
|
|
//go r.updateChannelMembers()
|
|
return nil
|
|
}
|
|
|
|
// disableBridge returns true and empties a bridge if we have IgnoreFailureOnStart configured
|
|
// otherwise returns false
|
|
func (r *Router) disableBridge(br *bridge.Bridge, err error) bool {
|
|
if r.BridgeValues().General.IgnoreFailureOnStart {
|
|
r.logger.Error(err)
|
|
// setting this bridge empty
|
|
*br = bridge.Bridge{}
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (r *Router) getBridge(account string) *bridge.Bridge {
|
|
for _, gw := range r.Gateways {
|
|
if br, ok := gw.Bridges[account]; ok {
|
|
return br
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *Router) handleReceive() {
|
|
for msg := range r.Message {
|
|
msg := msg // scopelint
|
|
r.handleEventGetChannelMembers(&msg)
|
|
r.handleEventFailure(&msg)
|
|
r.handleEventRejoinChannels(&msg)
|
|
|
|
// Set message protocol based on the account it came from
|
|
msg.Protocol = r.getBridge(msg.Account).Protocol
|
|
|
|
filesHandled := false
|
|
for _, gw := range r.Gateways {
|
|
// record all the message ID's of the different bridges
|
|
var msgIDs []*BrMsgID
|
|
if gw.ignoreMessage(&msg) {
|
|
continue
|
|
}
|
|
msg.Timestamp = time.Now()
|
|
gw.modifyMessage(&msg)
|
|
if !filesHandled {
|
|
gw.handleFiles(&msg)
|
|
filesHandled = true
|
|
}
|
|
for _, br := range gw.Bridges {
|
|
msgIDs = append(msgIDs, gw.handleMessage(&msg, br)...)
|
|
}
|
|
|
|
if msg.ID != "" {
|
|
_, exists := gw.Messages.Get(msg.Protocol + " " + msg.ID)
|
|
|
|
// Only add the message ID if it doesn't already exist
|
|
//
|
|
// For some bridges we always add/update the message ID.
|
|
// This is necessary as msgIDs will change if a bridge returns
|
|
// a different ID in response to edits.
|
|
if !exists {
|
|
gw.Messages.Add(msg.Protocol+" "+msg.ID, msgIDs)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// updateChannelMembers sends every minute an GetChannelMembers event to all bridges.
|
|
func (r *Router) updateChannelMembers() {
|
|
// TODO sleep a minute because slack can take a while
|
|
// fix this by having actually connectionDone events send to the router
|
|
time.Sleep(time.Minute)
|
|
for {
|
|
for _, gw := range r.Gateways {
|
|
for _, br := range gw.Bridges {
|
|
// only for slack now
|
|
if br.Protocol != "slack" {
|
|
continue
|
|
}
|
|
r.logger.Debugf("sending %s to %s", config.EventGetChannelMembers, br.Account)
|
|
if _, err := br.Send(config.Message{Event: config.EventGetChannelMembers}); err != nil {
|
|
r.logger.Errorf("updateChannelMembers: %s", err)
|
|
}
|
|
}
|
|
}
|
|
time.Sleep(time.Minute)
|
|
}
|
|
}
|