server.go (8576B)
1 package syslog 2 3 import ( 4 "bufio" 5 "crypto/tls" 6 "errors" 7 "net" 8 "strings" 9 "sync" 10 "time" 11 12 "gopkg.in/mcuadros/go-syslog.v2/format" 13 ) 14 15 var ( 16 RFC3164 = &format.RFC3164{} // RFC3164: http://www.ietf.org/rfc/rfc3164.txt 17 RFC5424 = &format.RFC5424{} // RFC5424: http://www.ietf.org/rfc/rfc5424.txt 18 RFC6587 = &format.RFC6587{} // RFC6587: http://www.ietf.org/rfc/rfc6587.txt - octet counting variant 19 Automatic = &format.Automatic{} // Automatically identify the format 20 ) 21 22 const ( 23 datagramChannelBufferSize = 10 24 datagramReadBufferSize = 64 * 1024 25 ) 26 27 // A function type which gets the TLS peer name from the connection. Can return 28 // ok=false to terminate the connection 29 type TlsPeerNameFunc func(tlsConn *tls.Conn) (tlsPeer string, ok bool) 30 31 type Server struct { 32 listeners []net.Listener 33 connections []net.PacketConn 34 wait sync.WaitGroup 35 doneTcp chan bool 36 datagramChannel chan DatagramMessage 37 format format.Format 38 handler Handler 39 lastError error 40 readTimeoutMilliseconds int64 41 tlsPeerNameFunc TlsPeerNameFunc 42 datagramPool sync.Pool 43 } 44 45 //NewServer returns a new Server 46 func NewServer() *Server { 47 return &Server{tlsPeerNameFunc: defaultTlsPeerName, datagramPool: sync.Pool{ 48 New: func() interface{} { 49 return make([]byte, 65536) 50 }, 51 }} 52 } 53 54 //Sets the syslog format (RFC3164 or RFC5424 or RFC6587) 55 func (s *Server) SetFormat(f format.Format) { 56 s.format = f 57 } 58 59 //Sets the handler, this handler with receive every syslog entry 60 func (s *Server) SetHandler(handler Handler) { 61 s.handler = handler 62 } 63 64 //Sets the connection timeout for TCP connections, in milliseconds 65 func (s *Server) SetTimeout(millseconds int64) { 66 s.readTimeoutMilliseconds = millseconds 67 } 68 69 // Set the function that extracts a TLS peer name from the TLS connection 70 func (s *Server) SetTlsPeerNameFunc(tlsPeerNameFunc TlsPeerNameFunc) { 71 s.tlsPeerNameFunc = tlsPeerNameFunc 72 } 73 74 // Default TLS peer name function - returns the CN of the certificate 75 func defaultTlsPeerName(tlsConn *tls.Conn) (tlsPeer string, ok bool) { 76 state := tlsConn.ConnectionState() 77 if len(state.PeerCertificates) <= 0 { 78 return "", false 79 } 80 cn := state.PeerCertificates[0].Subject.CommonName 81 return cn, true 82 } 83 84 //Configure the server for listen on an UDP addr 85 func (s *Server) ListenUDP(addr string) error { 86 udpAddr, err := net.ResolveUDPAddr("udp", addr) 87 if err != nil { 88 return err 89 } 90 91 connection, err := net.ListenUDP("udp", udpAddr) 92 if err != nil { 93 return err 94 } 95 connection.SetReadBuffer(datagramReadBufferSize) 96 97 s.connections = append(s.connections, connection) 98 return nil 99 } 100 101 //Configure the server for listen on an unix socket 102 func (s *Server) ListenUnixgram(addr string) error { 103 unixAddr, err := net.ResolveUnixAddr("unixgram", addr) 104 if err != nil { 105 return err 106 } 107 108 connection, err := net.ListenUnixgram("unixgram", unixAddr) 109 if err != nil { 110 return err 111 } 112 connection.SetReadBuffer(datagramReadBufferSize) 113 114 s.connections = append(s.connections, connection) 115 return nil 116 } 117 118 //Configure the server for listen on a TCP addr 119 func (s *Server) ListenTCP(addr string) error { 120 tcpAddr, err := net.ResolveTCPAddr("tcp", addr) 121 if err != nil { 122 return err 123 } 124 125 listener, err := net.ListenTCP("tcp", tcpAddr) 126 if err != nil { 127 return err 128 } 129 130 s.doneTcp = make(chan bool) 131 s.listeners = append(s.listeners, listener) 132 return nil 133 } 134 135 //Configure the server for listen on a TCP addr for TLS 136 func (s *Server) ListenTCPTLS(addr string, config *tls.Config) error { 137 listener, err := tls.Listen("tcp", addr, config) 138 if err != nil { 139 return err 140 } 141 142 s.doneTcp = make(chan bool) 143 s.listeners = append(s.listeners, listener) 144 return nil 145 } 146 147 //Starts the server, all the go routines goes to live 148 func (s *Server) Boot() error { 149 if s.format == nil { 150 return errors.New("please set a valid format") 151 } 152 153 if s.handler == nil { 154 return errors.New("please set a valid handler") 155 } 156 157 for _, listener := range s.listeners { 158 s.goAcceptConnection(listener) 159 } 160 161 if len(s.connections) > 0 { 162 s.goParseDatagrams() 163 } 164 165 for _, connection := range s.connections { 166 s.goReceiveDatagrams(connection) 167 } 168 169 return nil 170 } 171 172 func (s *Server) goAcceptConnection(listener net.Listener) { 173 s.wait.Add(1) 174 go func(listener net.Listener) { 175 loop: 176 for { 177 select { 178 case <-s.doneTcp: 179 break loop 180 default: 181 } 182 connection, err := listener.Accept() 183 if err != nil { 184 continue 185 } 186 187 s.goScanConnection(connection) 188 } 189 190 s.wait.Done() 191 }(listener) 192 } 193 194 func (s *Server) goScanConnection(connection net.Conn) { 195 scanner := bufio.NewScanner(connection) 196 if sf := s.format.GetSplitFunc(); sf != nil { 197 scanner.Split(sf) 198 } 199 200 remoteAddr := connection.RemoteAddr() 201 var client string 202 if remoteAddr != nil { 203 client = remoteAddr.String() 204 } 205 206 tlsPeer := "" 207 if tlsConn, ok := connection.(*tls.Conn); ok { 208 // Handshake now so we get the TLS peer information 209 if err := tlsConn.Handshake(); err != nil { 210 connection.Close() 211 return 212 } 213 if s.tlsPeerNameFunc != nil { 214 var ok bool 215 tlsPeer, ok = s.tlsPeerNameFunc(tlsConn) 216 if !ok { 217 connection.Close() 218 return 219 } 220 } 221 } 222 223 var scanCloser *ScanCloser 224 scanCloser = &ScanCloser{scanner, connection} 225 226 s.wait.Add(1) 227 go s.scan(scanCloser, client, tlsPeer) 228 } 229 230 func (s *Server) scan(scanCloser *ScanCloser, client string, tlsPeer string) { 231 loop: 232 for { 233 select { 234 case <-s.doneTcp: 235 break loop 236 default: 237 } 238 if s.readTimeoutMilliseconds > 0 { 239 scanCloser.closer.SetReadDeadline(time.Now().Add(time.Duration(s.readTimeoutMilliseconds) * time.Millisecond)) 240 } 241 if scanCloser.Scan() { 242 s.parser([]byte(scanCloser.Text()), client, tlsPeer) 243 } else { 244 break loop 245 } 246 } 247 scanCloser.closer.Close() 248 249 s.wait.Done() 250 } 251 252 func (s *Server) parser(line []byte, client string, tlsPeer string) { 253 parser := s.format.GetParser(line) 254 err := parser.Parse() 255 if err != nil { 256 s.lastError = err 257 } 258 259 logParts := parser.Dump() 260 logParts["client"] = client 261 if logParts["hostname"] == "" && (s.format == RFC3164 || s.format == Automatic) { 262 if i := strings.Index(client, ":"); i > 1 { 263 logParts["hostname"] = client[:i] 264 } else { 265 logParts["hostname"] = client 266 } 267 } 268 logParts["tls_peer"] = tlsPeer 269 270 s.handler.Handle(logParts, int64(len(line)), err) 271 } 272 273 //Returns the last error 274 func (s *Server) GetLastError() error { 275 return s.lastError 276 } 277 278 //Kill the server 279 func (s *Server) Kill() error { 280 for _, connection := range s.connections { 281 err := connection.Close() 282 if err != nil { 283 return err 284 } 285 } 286 287 for _, listener := range s.listeners { 288 err := listener.Close() 289 if err != nil { 290 return err 291 } 292 } 293 // Only need to close channel once to broadcast to all waiting 294 if s.doneTcp != nil { 295 close(s.doneTcp) 296 } 297 if s.datagramChannel != nil { 298 close(s.datagramChannel) 299 } 300 return nil 301 } 302 303 //Waits until the server stops 304 func (s *Server) Wait() { 305 s.wait.Wait() 306 } 307 308 type TimeoutCloser interface { 309 Close() error 310 SetReadDeadline(t time.Time) error 311 } 312 313 type ScanCloser struct { 314 *bufio.Scanner 315 closer TimeoutCloser 316 } 317 318 type DatagramMessage struct { 319 message []byte 320 client string 321 } 322 323 func (s *Server) goReceiveDatagrams(packetconn net.PacketConn) { 324 s.wait.Add(1) 325 go func() { 326 defer s.wait.Done() 327 for { 328 buf := s.datagramPool.Get().([]byte) 329 n, addr, err := packetconn.ReadFrom(buf) 330 if err == nil { 331 // Ignore trailing control characters and NULs 332 for ; (n > 0) && (buf[n-1] < 32); n-- { 333 } 334 if n > 0 { 335 var address string 336 if addr != nil { 337 address = addr.String() 338 } 339 s.datagramChannel <- DatagramMessage{buf[:n], address} 340 } 341 } else { 342 // there has been an error. Either the server has been killed 343 // or may be getting a transitory error due to (e.g.) the 344 // interface being shutdown in which case sleep() to avoid busy wait. 345 opError, ok := err.(*net.OpError) 346 if (ok) && !opError.Temporary() && !opError.Timeout() { 347 return 348 } 349 time.Sleep(10 * time.Millisecond) 350 } 351 } 352 }() 353 } 354 355 func (s *Server) goParseDatagrams() { 356 s.datagramChannel = make(chan DatagramMessage, datagramChannelBufferSize) 357 358 s.wait.Add(1) 359 go func() { 360 defer s.wait.Done() 361 for { 362 select { 363 case msg, ok := (<-s.datagramChannel): 364 if !ok { 365 return 366 } 367 if sf := s.format.GetSplitFunc(); sf != nil { 368 if _, token, err := sf(msg.message, true); err == nil { 369 s.parser(token, msg.client, "") 370 } 371 } else { 372 s.parser(msg.message, msg.client, "") 373 } 374 s.datagramPool.Put(msg.message[:cap(msg.message)]) 375 } 376 } 377 }() 378 }