gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

server.go (8576B)


      1 package syslog
      2 
      3 import (
      4 	"bufio"
      5 	"crypto/tls"
      6 	"errors"
      7 	"net"
      8 	"strings"
      9 	"sync"
     10 	"time"
     11 
     12 	"gopkg.in/mcuadros/go-syslog.v2/format"
     13 )
     14 
     15 var (
     16 	RFC3164   = &format.RFC3164{}   // RFC3164: http://www.ietf.org/rfc/rfc3164.txt
     17 	RFC5424   = &format.RFC5424{}   // RFC5424: http://www.ietf.org/rfc/rfc5424.txt
     18 	RFC6587   = &format.RFC6587{}   // RFC6587: http://www.ietf.org/rfc/rfc6587.txt - octet counting variant
     19 	Automatic = &format.Automatic{} // Automatically identify the format
     20 )
     21 
     22 const (
     23 	datagramChannelBufferSize = 10
     24 	datagramReadBufferSize    = 64 * 1024
     25 )
     26 
     27 // A function type which gets the TLS peer name from the connection. Can return
     28 // ok=false to terminate the connection
     29 type TlsPeerNameFunc func(tlsConn *tls.Conn) (tlsPeer string, ok bool)
     30 
     31 type Server struct {
     32 	listeners               []net.Listener
     33 	connections             []net.PacketConn
     34 	wait                    sync.WaitGroup
     35 	doneTcp                 chan bool
     36 	datagramChannel         chan DatagramMessage
     37 	format                  format.Format
     38 	handler                 Handler
     39 	lastError               error
     40 	readTimeoutMilliseconds int64
     41 	tlsPeerNameFunc         TlsPeerNameFunc
     42 	datagramPool            sync.Pool
     43 }
     44 
     45 //NewServer returns a new Server
     46 func NewServer() *Server {
     47 	return &Server{tlsPeerNameFunc: defaultTlsPeerName, datagramPool: sync.Pool{
     48 		New: func() interface{} {
     49 			return make([]byte, 65536)
     50 		},
     51 	}}
     52 }
     53 
     54 //Sets the syslog format (RFC3164 or RFC5424 or RFC6587)
     55 func (s *Server) SetFormat(f format.Format) {
     56 	s.format = f
     57 }
     58 
     59 //Sets the handler, this handler with receive every syslog entry
     60 func (s *Server) SetHandler(handler Handler) {
     61 	s.handler = handler
     62 }
     63 
     64 //Sets the connection timeout for TCP connections, in milliseconds
     65 func (s *Server) SetTimeout(millseconds int64) {
     66 	s.readTimeoutMilliseconds = millseconds
     67 }
     68 
     69 // Set the function that extracts a TLS peer name from the TLS connection
     70 func (s *Server) SetTlsPeerNameFunc(tlsPeerNameFunc TlsPeerNameFunc) {
     71 	s.tlsPeerNameFunc = tlsPeerNameFunc
     72 }
     73 
     74 // Default TLS peer name function - returns the CN of the certificate
     75 func defaultTlsPeerName(tlsConn *tls.Conn) (tlsPeer string, ok bool) {
     76 	state := tlsConn.ConnectionState()
     77 	if len(state.PeerCertificates) <= 0 {
     78 		return "", false
     79 	}
     80 	cn := state.PeerCertificates[0].Subject.CommonName
     81 	return cn, true
     82 }
     83 
     84 //Configure the server for listen on an UDP addr
     85 func (s *Server) ListenUDP(addr string) error {
     86 	udpAddr, err := net.ResolveUDPAddr("udp", addr)
     87 	if err != nil {
     88 		return err
     89 	}
     90 
     91 	connection, err := net.ListenUDP("udp", udpAddr)
     92 	if err != nil {
     93 		return err
     94 	}
     95 	connection.SetReadBuffer(datagramReadBufferSize)
     96 
     97 	s.connections = append(s.connections, connection)
     98 	return nil
     99 }
    100 
    101 //Configure the server for listen on an unix socket
    102 func (s *Server) ListenUnixgram(addr string) error {
    103 	unixAddr, err := net.ResolveUnixAddr("unixgram", addr)
    104 	if err != nil {
    105 		return err
    106 	}
    107 
    108 	connection, err := net.ListenUnixgram("unixgram", unixAddr)
    109 	if err != nil {
    110 		return err
    111 	}
    112 	connection.SetReadBuffer(datagramReadBufferSize)
    113 
    114 	s.connections = append(s.connections, connection)
    115 	return nil
    116 }
    117 
    118 //Configure the server for listen on a TCP addr
    119 func (s *Server) ListenTCP(addr string) error {
    120 	tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
    121 	if err != nil {
    122 		return err
    123 	}
    124 
    125 	listener, err := net.ListenTCP("tcp", tcpAddr)
    126 	if err != nil {
    127 		return err
    128 	}
    129 
    130 	s.doneTcp = make(chan bool)
    131 	s.listeners = append(s.listeners, listener)
    132 	return nil
    133 }
    134 
    135 //Configure the server for listen on a TCP addr for TLS
    136 func (s *Server) ListenTCPTLS(addr string, config *tls.Config) error {
    137 	listener, err := tls.Listen("tcp", addr, config)
    138 	if err != nil {
    139 		return err
    140 	}
    141 
    142 	s.doneTcp = make(chan bool)
    143 	s.listeners = append(s.listeners, listener)
    144 	return nil
    145 }
    146 
    147 //Starts the server, all the go routines goes to live
    148 func (s *Server) Boot() error {
    149 	if s.format == nil {
    150 		return errors.New("please set a valid format")
    151 	}
    152 
    153 	if s.handler == nil {
    154 		return errors.New("please set a valid handler")
    155 	}
    156 
    157 	for _, listener := range s.listeners {
    158 		s.goAcceptConnection(listener)
    159 	}
    160 
    161 	if len(s.connections) > 0 {
    162 		s.goParseDatagrams()
    163 	}
    164 
    165 	for _, connection := range s.connections {
    166 		s.goReceiveDatagrams(connection)
    167 	}
    168 
    169 	return nil
    170 }
    171 
    172 func (s *Server) goAcceptConnection(listener net.Listener) {
    173 	s.wait.Add(1)
    174 	go func(listener net.Listener) {
    175 	loop:
    176 		for {
    177 			select {
    178 			case <-s.doneTcp:
    179 				break loop
    180 			default:
    181 			}
    182 			connection, err := listener.Accept()
    183 			if err != nil {
    184 				continue
    185 			}
    186 
    187 			s.goScanConnection(connection)
    188 		}
    189 
    190 		s.wait.Done()
    191 	}(listener)
    192 }
    193 
    194 func (s *Server) goScanConnection(connection net.Conn) {
    195 	scanner := bufio.NewScanner(connection)
    196 	if sf := s.format.GetSplitFunc(); sf != nil {
    197 		scanner.Split(sf)
    198 	}
    199 
    200 	remoteAddr := connection.RemoteAddr()
    201 	var client string
    202 	if remoteAddr != nil {
    203 		client = remoteAddr.String()
    204 	}
    205 
    206 	tlsPeer := ""
    207 	if tlsConn, ok := connection.(*tls.Conn); ok {
    208 		// Handshake now so we get the TLS peer information
    209 		if err := tlsConn.Handshake(); err != nil {
    210 			connection.Close()
    211 			return
    212 		}
    213 		if s.tlsPeerNameFunc != nil {
    214 			var ok bool
    215 			tlsPeer, ok = s.tlsPeerNameFunc(tlsConn)
    216 			if !ok {
    217 				connection.Close()
    218 				return
    219 			}
    220 		}
    221 	}
    222 
    223 	var scanCloser *ScanCloser
    224 	scanCloser = &ScanCloser{scanner, connection}
    225 
    226 	s.wait.Add(1)
    227 	go s.scan(scanCloser, client, tlsPeer)
    228 }
    229 
    230 func (s *Server) scan(scanCloser *ScanCloser, client string, tlsPeer string) {
    231 loop:
    232 	for {
    233 		select {
    234 		case <-s.doneTcp:
    235 			break loop
    236 		default:
    237 		}
    238 		if s.readTimeoutMilliseconds > 0 {
    239 			scanCloser.closer.SetReadDeadline(time.Now().Add(time.Duration(s.readTimeoutMilliseconds) * time.Millisecond))
    240 		}
    241 		if scanCloser.Scan() {
    242 			s.parser([]byte(scanCloser.Text()), client, tlsPeer)
    243 		} else {
    244 			break loop
    245 		}
    246 	}
    247 	scanCloser.closer.Close()
    248 
    249 	s.wait.Done()
    250 }
    251 
    252 func (s *Server) parser(line []byte, client string, tlsPeer string) {
    253 	parser := s.format.GetParser(line)
    254 	err := parser.Parse()
    255 	if err != nil {
    256 		s.lastError = err
    257 	}
    258 
    259 	logParts := parser.Dump()
    260 	logParts["client"] = client
    261 	if logParts["hostname"] == "" && (s.format == RFC3164 || s.format == Automatic) {
    262 		if i := strings.Index(client, ":"); i > 1 {
    263 			logParts["hostname"] = client[:i]
    264 		} else {
    265 			logParts["hostname"] = client
    266 		}
    267 	}
    268 	logParts["tls_peer"] = tlsPeer
    269 
    270 	s.handler.Handle(logParts, int64(len(line)), err)
    271 }
    272 
    273 //Returns the last error
    274 func (s *Server) GetLastError() error {
    275 	return s.lastError
    276 }
    277 
    278 //Kill the server
    279 func (s *Server) Kill() error {
    280 	for _, connection := range s.connections {
    281 		err := connection.Close()
    282 		if err != nil {
    283 			return err
    284 		}
    285 	}
    286 
    287 	for _, listener := range s.listeners {
    288 		err := listener.Close()
    289 		if err != nil {
    290 			return err
    291 		}
    292 	}
    293 	// Only need to close channel once to broadcast to all waiting
    294 	if s.doneTcp != nil {
    295 		close(s.doneTcp)
    296 	}
    297 	if s.datagramChannel != nil {
    298 		close(s.datagramChannel)
    299 	}
    300 	return nil
    301 }
    302 
    303 //Waits until the server stops
    304 func (s *Server) Wait() {
    305 	s.wait.Wait()
    306 }
    307 
    308 type TimeoutCloser interface {
    309 	Close() error
    310 	SetReadDeadline(t time.Time) error
    311 }
    312 
    313 type ScanCloser struct {
    314 	*bufio.Scanner
    315 	closer TimeoutCloser
    316 }
    317 
    318 type DatagramMessage struct {
    319 	message []byte
    320 	client  string
    321 }
    322 
    323 func (s *Server) goReceiveDatagrams(packetconn net.PacketConn) {
    324 	s.wait.Add(1)
    325 	go func() {
    326 		defer s.wait.Done()
    327 		for {
    328 			buf := s.datagramPool.Get().([]byte)
    329 			n, addr, err := packetconn.ReadFrom(buf)
    330 			if err == nil {
    331 				// Ignore trailing control characters and NULs
    332 				for ; (n > 0) && (buf[n-1] < 32); n-- {
    333 				}
    334 				if n > 0 {
    335 					var address string
    336 					if addr != nil {
    337 						address = addr.String()
    338 					}
    339 					s.datagramChannel <- DatagramMessage{buf[:n], address}
    340 				}
    341 			} else {
    342 				// there has been an error. Either the server has been killed
    343 				// or may be getting a transitory error due to (e.g.) the
    344 				// interface being shutdown in which case sleep() to avoid busy wait.
    345 				opError, ok := err.(*net.OpError)
    346 				if (ok) && !opError.Temporary() && !opError.Timeout() {
    347 					return
    348 				}
    349 				time.Sleep(10 * time.Millisecond)
    350 			}
    351 		}
    352 	}()
    353 }
    354 
    355 func (s *Server) goParseDatagrams() {
    356 	s.datagramChannel = make(chan DatagramMessage, datagramChannelBufferSize)
    357 
    358 	s.wait.Add(1)
    359 	go func() {
    360 		defer s.wait.Done()
    361 		for {
    362 			select {
    363 			case msg, ok := (<-s.datagramChannel):
    364 				if !ok {
    365 					return
    366 				}
    367 				if sf := s.format.GetSplitFunc(); sf != nil {
    368 					if _, token, err := sf(msg.message, true); err == nil {
    369 						s.parser(token, msg.client, "")
    370 					}
    371 				} else {
    372 					s.parser(msg.message, msg.client, "")
    373 				}
    374 				s.datagramPool.Put(msg.message[:cap(msg.message)])
    375 			}
    376 		}
    377 	}()
    378 }