mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2024-11-30 07:32:45 +00:00
379 lines
8.4 KiB
Go
379 lines
8.4 KiB
Go
|
package syslog
|
||
|
|
||
|
import (
|
||
|
"bufio"
|
||
|
"crypto/tls"
|
||
|
"errors"
|
||
|
"net"
|
||
|
"strings"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"gopkg.in/mcuadros/go-syslog.v2/format"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
RFC3164 = &format.RFC3164{} // RFC3164: http://www.ietf.org/rfc/rfc3164.txt
|
||
|
RFC5424 = &format.RFC5424{} // RFC5424: http://www.ietf.org/rfc/rfc5424.txt
|
||
|
RFC6587 = &format.RFC6587{} // RFC6587: http://www.ietf.org/rfc/rfc6587.txt - octet counting variant
|
||
|
Automatic = &format.Automatic{} // Automatically identify the format
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
datagramChannelBufferSize = 10
|
||
|
datagramReadBufferSize = 64 * 1024
|
||
|
)
|
||
|
|
||
|
// A function type which gets the TLS peer name from the connection. Can return
|
||
|
// ok=false to terminate the connection
|
||
|
type TlsPeerNameFunc func(tlsConn *tls.Conn) (tlsPeer string, ok bool)
|
||
|
|
||
|
type Server struct {
|
||
|
listeners []net.Listener
|
||
|
connections []net.PacketConn
|
||
|
wait sync.WaitGroup
|
||
|
doneTcp chan bool
|
||
|
datagramChannel chan DatagramMessage
|
||
|
format format.Format
|
||
|
handler Handler
|
||
|
lastError error
|
||
|
readTimeoutMilliseconds int64
|
||
|
tlsPeerNameFunc TlsPeerNameFunc
|
||
|
datagramPool sync.Pool
|
||
|
}
|
||
|
|
||
|
//NewServer returns a new Server
|
||
|
func NewServer() *Server {
|
||
|
return &Server{tlsPeerNameFunc: defaultTlsPeerName, datagramPool: sync.Pool{
|
||
|
New: func() interface{} {
|
||
|
return make([]byte, 65536)
|
||
|
},
|
||
|
}}
|
||
|
}
|
||
|
|
||
|
//Sets the syslog format (RFC3164 or RFC5424 or RFC6587)
|
||
|
func (s *Server) SetFormat(f format.Format) {
|
||
|
s.format = f
|
||
|
}
|
||
|
|
||
|
//Sets the handler, this handler with receive every syslog entry
|
||
|
func (s *Server) SetHandler(handler Handler) {
|
||
|
s.handler = handler
|
||
|
}
|
||
|
|
||
|
//Sets the connection timeout for TCP connections, in milliseconds
|
||
|
func (s *Server) SetTimeout(millseconds int64) {
|
||
|
s.readTimeoutMilliseconds = millseconds
|
||
|
}
|
||
|
|
||
|
// Set the function that extracts a TLS peer name from the TLS connection
|
||
|
func (s *Server) SetTlsPeerNameFunc(tlsPeerNameFunc TlsPeerNameFunc) {
|
||
|
s.tlsPeerNameFunc = tlsPeerNameFunc
|
||
|
}
|
||
|
|
||
|
// Default TLS peer name function - returns the CN of the certificate
|
||
|
func defaultTlsPeerName(tlsConn *tls.Conn) (tlsPeer string, ok bool) {
|
||
|
state := tlsConn.ConnectionState()
|
||
|
if len(state.PeerCertificates) <= 0 {
|
||
|
return "", false
|
||
|
}
|
||
|
cn := state.PeerCertificates[0].Subject.CommonName
|
||
|
return cn, true
|
||
|
}
|
||
|
|
||
|
//Configure the server for listen on an UDP addr
|
||
|
func (s *Server) ListenUDP(addr string) error {
|
||
|
udpAddr, err := net.ResolveUDPAddr("udp", addr)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
connection, err := net.ListenUDP("udp", udpAddr)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
connection.SetReadBuffer(datagramReadBufferSize)
|
||
|
|
||
|
s.connections = append(s.connections, connection)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
//Configure the server for listen on an unix socket
|
||
|
func (s *Server) ListenUnixgram(addr string) error {
|
||
|
unixAddr, err := net.ResolveUnixAddr("unixgram", addr)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
connection, err := net.ListenUnixgram("unixgram", unixAddr)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
connection.SetReadBuffer(datagramReadBufferSize)
|
||
|
|
||
|
s.connections = append(s.connections, connection)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
//Configure the server for listen on a TCP addr
|
||
|
func (s *Server) ListenTCP(addr string) error {
|
||
|
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
listener, err := net.ListenTCP("tcp", tcpAddr)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
s.doneTcp = make(chan bool)
|
||
|
s.listeners = append(s.listeners, listener)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
//Configure the server for listen on a TCP addr for TLS
|
||
|
func (s *Server) ListenTCPTLS(addr string, config *tls.Config) error {
|
||
|
listener, err := tls.Listen("tcp", addr, config)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
s.doneTcp = make(chan bool)
|
||
|
s.listeners = append(s.listeners, listener)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
//Starts the server, all the go routines goes to live
|
||
|
func (s *Server) Boot() error {
|
||
|
if s.format == nil {
|
||
|
return errors.New("please set a valid format")
|
||
|
}
|
||
|
|
||
|
if s.handler == nil {
|
||
|
return errors.New("please set a valid handler")
|
||
|
}
|
||
|
|
||
|
for _, listener := range s.listeners {
|
||
|
s.goAcceptConnection(listener)
|
||
|
}
|
||
|
|
||
|
if len(s.connections) > 0 {
|
||
|
s.goParseDatagrams()
|
||
|
}
|
||
|
|
||
|
for _, connection := range s.connections {
|
||
|
s.goReceiveDatagrams(connection)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *Server) goAcceptConnection(listener net.Listener) {
|
||
|
s.wait.Add(1)
|
||
|
go func(listener net.Listener) {
|
||
|
loop:
|
||
|
for {
|
||
|
select {
|
||
|
case <-s.doneTcp:
|
||
|
break loop
|
||
|
default:
|
||
|
}
|
||
|
connection, err := listener.Accept()
|
||
|
if err != nil {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
s.goScanConnection(connection)
|
||
|
}
|
||
|
|
||
|
s.wait.Done()
|
||
|
}(listener)
|
||
|
}
|
||
|
|
||
|
func (s *Server) goScanConnection(connection net.Conn) {
|
||
|
scanner := bufio.NewScanner(connection)
|
||
|
if sf := s.format.GetSplitFunc(); sf != nil {
|
||
|
scanner.Split(sf)
|
||
|
}
|
||
|
|
||
|
remoteAddr := connection.RemoteAddr()
|
||
|
var client string
|
||
|
if remoteAddr != nil {
|
||
|
client = remoteAddr.String()
|
||
|
}
|
||
|
|
||
|
tlsPeer := ""
|
||
|
if tlsConn, ok := connection.(*tls.Conn); ok {
|
||
|
// Handshake now so we get the TLS peer information
|
||
|
if err := tlsConn.Handshake(); err != nil {
|
||
|
connection.Close()
|
||
|
return
|
||
|
}
|
||
|
if s.tlsPeerNameFunc != nil {
|
||
|
var ok bool
|
||
|
tlsPeer, ok = s.tlsPeerNameFunc(tlsConn)
|
||
|
if !ok {
|
||
|
connection.Close()
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var scanCloser *ScanCloser
|
||
|
scanCloser = &ScanCloser{scanner, connection}
|
||
|
|
||
|
s.wait.Add(1)
|
||
|
go s.scan(scanCloser, client, tlsPeer)
|
||
|
}
|
||
|
|
||
|
func (s *Server) scan(scanCloser *ScanCloser, client string, tlsPeer string) {
|
||
|
loop:
|
||
|
for {
|
||
|
select {
|
||
|
case <-s.doneTcp:
|
||
|
break loop
|
||
|
default:
|
||
|
}
|
||
|
if s.readTimeoutMilliseconds > 0 {
|
||
|
scanCloser.closer.SetReadDeadline(time.Now().Add(time.Duration(s.readTimeoutMilliseconds) * time.Millisecond))
|
||
|
}
|
||
|
if scanCloser.Scan() {
|
||
|
s.parser([]byte(scanCloser.Text()), client, tlsPeer)
|
||
|
} else {
|
||
|
break loop
|
||
|
}
|
||
|
}
|
||
|
scanCloser.closer.Close()
|
||
|
|
||
|
s.wait.Done()
|
||
|
}
|
||
|
|
||
|
func (s *Server) parser(line []byte, client string, tlsPeer string) {
|
||
|
parser := s.format.GetParser(line)
|
||
|
err := parser.Parse()
|
||
|
if err != nil {
|
||
|
s.lastError = err
|
||
|
}
|
||
|
|
||
|
logParts := parser.Dump()
|
||
|
logParts["client"] = client
|
||
|
if logParts["hostname"] == "" && (s.format == RFC3164 || s.format == Automatic) {
|
||
|
if i := strings.Index(client, ":"); i > 1 {
|
||
|
logParts["hostname"] = client[:i]
|
||
|
} else {
|
||
|
logParts["hostname"] = client
|
||
|
}
|
||
|
}
|
||
|
logParts["tls_peer"] = tlsPeer
|
||
|
|
||
|
s.handler.Handle(logParts, int64(len(line)), err)
|
||
|
}
|
||
|
|
||
|
//Returns the last error
|
||
|
func (s *Server) GetLastError() error {
|
||
|
return s.lastError
|
||
|
}
|
||
|
|
||
|
//Kill the server
|
||
|
func (s *Server) Kill() error {
|
||
|
for _, connection := range s.connections {
|
||
|
err := connection.Close()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
for _, listener := range s.listeners {
|
||
|
err := listener.Close()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
// Only need to close channel once to broadcast to all waiting
|
||
|
if s.doneTcp != nil {
|
||
|
close(s.doneTcp)
|
||
|
}
|
||
|
if s.datagramChannel != nil {
|
||
|
close(s.datagramChannel)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
//Waits until the server stops
|
||
|
func (s *Server) Wait() {
|
||
|
s.wait.Wait()
|
||
|
}
|
||
|
|
||
|
type TimeoutCloser interface {
|
||
|
Close() error
|
||
|
SetReadDeadline(t time.Time) error
|
||
|
}
|
||
|
|
||
|
type ScanCloser struct {
|
||
|
*bufio.Scanner
|
||
|
closer TimeoutCloser
|
||
|
}
|
||
|
|
||
|
type DatagramMessage struct {
|
||
|
message []byte
|
||
|
client string
|
||
|
}
|
||
|
|
||
|
func (s *Server) goReceiveDatagrams(packetconn net.PacketConn) {
|
||
|
s.wait.Add(1)
|
||
|
go func() {
|
||
|
defer s.wait.Done()
|
||
|
for {
|
||
|
buf := s.datagramPool.Get().([]byte)
|
||
|
n, addr, err := packetconn.ReadFrom(buf)
|
||
|
if err == nil {
|
||
|
// Ignore trailing control characters and NULs
|
||
|
for ; (n > 0) && (buf[n-1] < 32); n-- {
|
||
|
}
|
||
|
if n > 0 {
|
||
|
var address string
|
||
|
if addr != nil {
|
||
|
address = addr.String()
|
||
|
}
|
||
|
s.datagramChannel <- DatagramMessage{buf[:n], address}
|
||
|
}
|
||
|
} else {
|
||
|
// there has been an error. Either the server has been killed
|
||
|
// or may be getting a transitory error due to (e.g.) the
|
||
|
// interface being shutdown in which case sleep() to avoid busy wait.
|
||
|
opError, ok := err.(*net.OpError)
|
||
|
if (ok) && !opError.Temporary() && !opError.Timeout() {
|
||
|
return
|
||
|
}
|
||
|
time.Sleep(10 * time.Millisecond)
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
func (s *Server) goParseDatagrams() {
|
||
|
s.datagramChannel = make(chan DatagramMessage, datagramChannelBufferSize)
|
||
|
|
||
|
s.wait.Add(1)
|
||
|
go func() {
|
||
|
defer s.wait.Done()
|
||
|
for {
|
||
|
select {
|
||
|
case msg, ok := (<-s.datagramChannel):
|
||
|
if !ok {
|
||
|
return
|
||
|
}
|
||
|
if sf := s.format.GetSplitFunc(); sf != nil {
|
||
|
if _, token, err := sf(msg.message, true); err == nil {
|
||
|
s.parser(token, msg.client, "")
|
||
|
}
|
||
|
} else {
|
||
|
s.parser(msg.message, msg.client, "")
|
||
|
}
|
||
|
s.datagramPool.Put(msg.message[:cap(msg.message)])
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
}
|