conn.go (26249B)
1 package dbus 2 3 import ( 4 "context" 5 "errors" 6 "io" 7 "os" 8 "strings" 9 "sync" 10 ) 11 12 var ( 13 systemBus *Conn 14 systemBusLck sync.Mutex 15 sessionBus *Conn 16 sessionBusLck sync.Mutex 17 ) 18 19 // ErrClosed is the error returned by calls on a closed connection. 20 var ErrClosed = errors.New("dbus: connection closed by user") 21 22 // Conn represents a connection to a message bus (usually, the system or 23 // session bus). 24 // 25 // Connections are either shared or private. Shared connections 26 // are shared between calls to the functions that return them. As a result, 27 // the methods Close, Auth and Hello must not be called on them. 28 // 29 // Multiple goroutines may invoke methods on a connection simultaneously. 30 type Conn struct { 31 transport 32 33 ctx context.Context 34 cancelCtx context.CancelFunc 35 36 closeOnce sync.Once 37 closeErr error 38 39 busObj BusObject 40 unixFD bool 41 uuid string 42 43 handler Handler 44 signalHandler SignalHandler 45 serialGen SerialGenerator 46 inInt Interceptor 47 outInt Interceptor 48 auth []Auth 49 50 names *nameTracker 51 calls *callTracker 52 outHandler *outputHandler 53 54 eavesdropped chan<- *Message 55 eavesdroppedLck sync.Mutex 56 } 57 58 // SessionBus returns a shared connection to the session bus, connecting to it 59 // if not already done. 60 func SessionBus() (conn *Conn, err error) { 61 sessionBusLck.Lock() 62 defer sessionBusLck.Unlock() 63 if sessionBus != nil && 64 sessionBus.Connected() { 65 return sessionBus, nil 66 } 67 defer func() { 68 if conn != nil { 69 sessionBus = conn 70 } 71 }() 72 conn, err = ConnectSessionBus() 73 return 74 } 75 76 func getSessionBusAddress() (string, error) { 77 if address := os.Getenv("DBUS_SESSION_BUS_ADDRESS"); address != "" && address != "autolaunch:" { 78 return address, nil 79 80 } else if address := tryDiscoverDbusSessionBusAddress(); address != "" { 81 os.Setenv("DBUS_SESSION_BUS_ADDRESS", address) 82 return address, nil 83 } 84 return getSessionBusPlatformAddress() 85 } 86 87 // SessionBusPrivate returns a new private connection to the session bus. 88 func SessionBusPrivate(opts ...ConnOption) (*Conn, error) { 89 address, err := getSessionBusAddress() 90 if err != nil { 91 return nil, err 92 } 93 94 return Dial(address, opts...) 95 } 96 97 // SessionBusPrivate returns a new private connection to the session bus. 98 // 99 // Deprecated: use SessionBusPrivate with options instead. 100 func SessionBusPrivateHandler(handler Handler, signalHandler SignalHandler) (*Conn, error) { 101 return SessionBusPrivate(WithHandler(handler), WithSignalHandler(signalHandler)) 102 } 103 104 // SystemBus returns a shared connection to the system bus, connecting to it if 105 // not already done. 106 func SystemBus() (conn *Conn, err error) { 107 systemBusLck.Lock() 108 defer systemBusLck.Unlock() 109 if systemBus != nil && 110 systemBus.Connected() { 111 return systemBus, nil 112 } 113 defer func() { 114 if conn != nil { 115 systemBus = conn 116 } 117 }() 118 conn, err = ConnectSystemBus() 119 return 120 } 121 122 // ConnectSessionBus connects to the session bus. 123 func ConnectSessionBus(opts ...ConnOption) (*Conn, error) { 124 address, err := getSessionBusAddress() 125 if err != nil { 126 return nil, err 127 } 128 return Connect(address, opts...) 129 } 130 131 // ConnectSystemBus connects to the system bus. 132 func ConnectSystemBus(opts ...ConnOption) (*Conn, error) { 133 return Connect(getSystemBusPlatformAddress(), opts...) 134 } 135 136 // Connect connects to the given address. 137 // 138 // Returned connection is ready to use and doesn't require calling 139 // Auth and Hello methods to make it usable. 140 func Connect(address string, opts ...ConnOption) (*Conn, error) { 141 conn, err := Dial(address, opts...) 142 if err != nil { 143 return nil, err 144 } 145 if err = conn.Auth(conn.auth); err != nil { 146 _ = conn.Close() 147 return nil, err 148 } 149 if err = conn.Hello(); err != nil { 150 _ = conn.Close() 151 return nil, err 152 } 153 return conn, nil 154 } 155 156 // SystemBusPrivate returns a new private connection to the system bus. 157 // Note: this connection is not ready to use. One must perform Auth and Hello 158 // on the connection before it is useable. 159 func SystemBusPrivate(opts ...ConnOption) (*Conn, error) { 160 return Dial(getSystemBusPlatformAddress(), opts...) 161 } 162 163 // SystemBusPrivateHandler returns a new private connection to the system bus, using the provided handlers. 164 // 165 // Deprecated: use SystemBusPrivate with options instead. 166 func SystemBusPrivateHandler(handler Handler, signalHandler SignalHandler) (*Conn, error) { 167 return SystemBusPrivate(WithHandler(handler), WithSignalHandler(signalHandler)) 168 } 169 170 // Dial establishes a new private connection to the message bus specified by address. 171 func Dial(address string, opts ...ConnOption) (*Conn, error) { 172 tr, err := getTransport(address) 173 if err != nil { 174 return nil, err 175 } 176 return newConn(tr, opts...) 177 } 178 179 // DialHandler establishes a new private connection to the message bus specified by address, using the supplied handlers. 180 // 181 // Deprecated: use Dial with options instead. 182 func DialHandler(address string, handler Handler, signalHandler SignalHandler) (*Conn, error) { 183 return Dial(address, WithSignalHandler(signalHandler)) 184 } 185 186 // ConnOption is a connection option. 187 type ConnOption func(conn *Conn) error 188 189 // WithHandler overrides the default handler. 190 func WithHandler(handler Handler) ConnOption { 191 return func(conn *Conn) error { 192 conn.handler = handler 193 return nil 194 } 195 } 196 197 // WithSignalHandler overrides the default signal handler. 198 func WithSignalHandler(handler SignalHandler) ConnOption { 199 return func(conn *Conn) error { 200 conn.signalHandler = handler 201 return nil 202 } 203 } 204 205 // WithSerialGenerator overrides the default signals generator. 206 func WithSerialGenerator(gen SerialGenerator) ConnOption { 207 return func(conn *Conn) error { 208 conn.serialGen = gen 209 return nil 210 } 211 } 212 213 // WithAuth sets authentication methods for the auth conversation. 214 func WithAuth(methods ...Auth) ConnOption { 215 return func(conn *Conn) error { 216 conn.auth = methods 217 return nil 218 } 219 } 220 221 // Interceptor intercepts incoming and outgoing messages. 222 type Interceptor func(msg *Message) 223 224 // WithIncomingInterceptor sets the given interceptor for incoming messages. 225 func WithIncomingInterceptor(interceptor Interceptor) ConnOption { 226 return func(conn *Conn) error { 227 conn.inInt = interceptor 228 return nil 229 } 230 } 231 232 // WithOutgoingInterceptor sets the given interceptor for outgoing messages. 233 func WithOutgoingInterceptor(interceptor Interceptor) ConnOption { 234 return func(conn *Conn) error { 235 conn.outInt = interceptor 236 return nil 237 } 238 } 239 240 // WithContext overrides the default context for the connection. 241 func WithContext(ctx context.Context) ConnOption { 242 return func(conn *Conn) error { 243 conn.ctx = ctx 244 return nil 245 } 246 } 247 248 // NewConn creates a new private *Conn from an already established connection. 249 func NewConn(conn io.ReadWriteCloser, opts ...ConnOption) (*Conn, error) { 250 return newConn(genericTransport{conn}, opts...) 251 } 252 253 // NewConnHandler creates a new private *Conn from an already established connection, using the supplied handlers. 254 // 255 // Deprecated: use NewConn with options instead. 256 func NewConnHandler(conn io.ReadWriteCloser, handler Handler, signalHandler SignalHandler) (*Conn, error) { 257 return NewConn(genericTransport{conn}, WithHandler(handler), WithSignalHandler(signalHandler)) 258 } 259 260 // newConn creates a new *Conn from a transport. 261 func newConn(tr transport, opts ...ConnOption) (*Conn, error) { 262 conn := new(Conn) 263 conn.transport = tr 264 for _, opt := range opts { 265 if err := opt(conn); err != nil { 266 return nil, err 267 } 268 } 269 if conn.ctx == nil { 270 conn.ctx = context.Background() 271 } 272 conn.ctx, conn.cancelCtx = context.WithCancel(conn.ctx) 273 go func() { 274 <-conn.ctx.Done() 275 conn.Close() 276 }() 277 278 conn.calls = newCallTracker() 279 if conn.handler == nil { 280 conn.handler = NewDefaultHandler() 281 } 282 if conn.signalHandler == nil { 283 conn.signalHandler = NewDefaultSignalHandler() 284 } 285 if conn.serialGen == nil { 286 conn.serialGen = newSerialGenerator() 287 } 288 conn.outHandler = &outputHandler{conn: conn} 289 conn.names = newNameTracker() 290 conn.busObj = conn.Object("org.freedesktop.DBus", "/org/freedesktop/DBus") 291 return conn, nil 292 } 293 294 // BusObject returns the object owned by the bus daemon which handles 295 // administrative requests. 296 func (conn *Conn) BusObject() BusObject { 297 return conn.busObj 298 } 299 300 // Close closes the connection. Any blocked operations will return with errors 301 // and the channels passed to Eavesdrop and Signal are closed. This method must 302 // not be called on shared connections. 303 func (conn *Conn) Close() error { 304 conn.closeOnce.Do(func() { 305 conn.outHandler.close() 306 if term, ok := conn.signalHandler.(Terminator); ok { 307 term.Terminate() 308 } 309 310 if term, ok := conn.handler.(Terminator); ok { 311 term.Terminate() 312 } 313 314 conn.eavesdroppedLck.Lock() 315 if conn.eavesdropped != nil { 316 close(conn.eavesdropped) 317 } 318 conn.eavesdroppedLck.Unlock() 319 320 conn.cancelCtx() 321 322 conn.closeErr = conn.transport.Close() 323 }) 324 return conn.closeErr 325 } 326 327 // Context returns the context associated with the connection. The 328 // context will be cancelled when the connection is closed. 329 func (conn *Conn) Context() context.Context { 330 return conn.ctx 331 } 332 333 // Connected returns whether conn is connected 334 func (conn *Conn) Connected() bool { 335 return conn.ctx.Err() == nil 336 } 337 338 // Eavesdrop causes conn to send all incoming messages to the given channel 339 // without further processing. Method replies, errors and signals will not be 340 // sent to the appropriate channels and method calls will not be handled. If nil 341 // is passed, the normal behaviour is restored. 342 // 343 // The caller has to make sure that ch is sufficiently buffered; 344 // if a message arrives when a write to ch is not possible, the message is 345 // discarded. 346 func (conn *Conn) Eavesdrop(ch chan<- *Message) { 347 conn.eavesdroppedLck.Lock() 348 conn.eavesdropped = ch 349 conn.eavesdroppedLck.Unlock() 350 } 351 352 // getSerial returns an unused serial. 353 func (conn *Conn) getSerial() uint32 { 354 return conn.serialGen.GetSerial() 355 } 356 357 // Hello sends the initial org.freedesktop.DBus.Hello call. This method must be 358 // called after authentication, but before sending any other messages to the 359 // bus. Hello must not be called for shared connections. 360 func (conn *Conn) Hello() error { 361 var s string 362 err := conn.busObj.Call("org.freedesktop.DBus.Hello", 0).Store(&s) 363 if err != nil { 364 return err 365 } 366 conn.names.acquireUniqueConnectionName(s) 367 return nil 368 } 369 370 // inWorker runs in an own goroutine, reading incoming messages from the 371 // transport and dispatching them appropriately. 372 func (conn *Conn) inWorker() { 373 sequenceGen := newSequenceGenerator() 374 for { 375 msg, err := conn.ReadMessage() 376 if err != nil { 377 if _, ok := err.(InvalidMessageError); !ok { 378 // Some read error occurred (usually EOF); we can't really do 379 // anything but to shut down all stuff and returns errors to all 380 // pending replies. 381 conn.Close() 382 conn.calls.finalizeAllWithError(sequenceGen, err) 383 return 384 } 385 // invalid messages are ignored 386 continue 387 } 388 conn.eavesdroppedLck.Lock() 389 if conn.eavesdropped != nil { 390 select { 391 case conn.eavesdropped <- msg: 392 default: 393 } 394 conn.eavesdroppedLck.Unlock() 395 continue 396 } 397 conn.eavesdroppedLck.Unlock() 398 dest, _ := msg.Headers[FieldDestination].value.(string) 399 found := dest == "" || 400 !conn.names.uniqueNameIsKnown() || 401 conn.names.isKnownName(dest) 402 if !found { 403 // Eavesdropped a message, but no channel for it is registered. 404 // Ignore it. 405 continue 406 } 407 408 if conn.inInt != nil { 409 conn.inInt(msg) 410 } 411 sequence := sequenceGen.next() 412 switch msg.Type { 413 case TypeError: 414 conn.serialGen.RetireSerial(conn.calls.handleDBusError(sequence, msg)) 415 case TypeMethodReply: 416 conn.serialGen.RetireSerial(conn.calls.handleReply(sequence, msg)) 417 case TypeSignal: 418 conn.handleSignal(sequence, msg) 419 case TypeMethodCall: 420 go conn.handleCall(msg) 421 } 422 423 } 424 } 425 426 func (conn *Conn) handleSignal(sequence Sequence, msg *Message) { 427 iface := msg.Headers[FieldInterface].value.(string) 428 member := msg.Headers[FieldMember].value.(string) 429 // as per http://dbus.freedesktop.org/doc/dbus-specification.html , 430 // sender is optional for signals. 431 sender, _ := msg.Headers[FieldSender].value.(string) 432 if iface == "org.freedesktop.DBus" && sender == "org.freedesktop.DBus" { 433 if member == "NameLost" { 434 // If we lost the name on the bus, remove it from our 435 // tracking list. 436 name, ok := msg.Body[0].(string) 437 if !ok { 438 panic("Unable to read the lost name") 439 } 440 conn.names.loseName(name) 441 } else if member == "NameAcquired" { 442 // If we acquired the name on the bus, add it to our 443 // tracking list. 444 name, ok := msg.Body[0].(string) 445 if !ok { 446 panic("Unable to read the acquired name") 447 } 448 conn.names.acquireName(name) 449 } 450 } 451 signal := &Signal{ 452 Sender: sender, 453 Path: msg.Headers[FieldPath].value.(ObjectPath), 454 Name: iface + "." + member, 455 Body: msg.Body, 456 Sequence: sequence, 457 } 458 conn.signalHandler.DeliverSignal(iface, member, signal) 459 } 460 461 // Names returns the list of all names that are currently owned by this 462 // connection. The slice is always at least one element long, the first element 463 // being the unique name of the connection. 464 func (conn *Conn) Names() []string { 465 return conn.names.listKnownNames() 466 } 467 468 // Object returns the object identified by the given destination name and path. 469 func (conn *Conn) Object(dest string, path ObjectPath) BusObject { 470 return &Object{conn, dest, path} 471 } 472 473 func (conn *Conn) sendMessageAndIfClosed(msg *Message, ifClosed func()) { 474 if msg.serial == 0 { 475 msg.serial = conn.getSerial() 476 } 477 if conn.outInt != nil { 478 conn.outInt(msg) 479 } 480 err := conn.outHandler.sendAndIfClosed(msg, ifClosed) 481 conn.calls.handleSendError(msg, err) 482 if err != nil { 483 conn.serialGen.RetireSerial(msg.serial) 484 } else if msg.Type != TypeMethodCall { 485 conn.serialGen.RetireSerial(msg.serial) 486 } 487 } 488 489 // Send sends the given message to the message bus. You usually don't need to 490 // use this; use the higher-level equivalents (Call / Go, Emit and Export) 491 // instead. If msg is a method call and NoReplyExpected is not set, a non-nil 492 // call is returned and the same value is sent to ch (which must be buffered) 493 // once the call is complete. Otherwise, ch is ignored and a Call structure is 494 // returned of which only the Err member is valid. 495 func (conn *Conn) Send(msg *Message, ch chan *Call) *Call { 496 return conn.send(context.Background(), msg, ch) 497 } 498 499 // SendWithContext acts like Send but takes a context 500 func (conn *Conn) SendWithContext(ctx context.Context, msg *Message, ch chan *Call) *Call { 501 return conn.send(ctx, msg, ch) 502 } 503 504 func (conn *Conn) send(ctx context.Context, msg *Message, ch chan *Call) *Call { 505 if ctx == nil { 506 panic("nil context") 507 } 508 if ch == nil { 509 ch = make(chan *Call, 1) 510 } else if cap(ch) == 0 { 511 panic("dbus: unbuffered channel passed to (*Conn).Send") 512 } 513 514 var call *Call 515 ctx, canceler := context.WithCancel(ctx) 516 msg.serial = conn.getSerial() 517 if msg.Type == TypeMethodCall && msg.Flags&FlagNoReplyExpected == 0 { 518 call = new(Call) 519 call.Destination, _ = msg.Headers[FieldDestination].value.(string) 520 call.Path, _ = msg.Headers[FieldPath].value.(ObjectPath) 521 iface, _ := msg.Headers[FieldInterface].value.(string) 522 member, _ := msg.Headers[FieldMember].value.(string) 523 call.Method = iface + "." + member 524 call.Args = msg.Body 525 call.Done = ch 526 call.ctx = ctx 527 call.ctxCanceler = canceler 528 conn.calls.track(msg.serial, call) 529 go func() { 530 <-ctx.Done() 531 conn.calls.handleSendError(msg, ctx.Err()) 532 }() 533 conn.sendMessageAndIfClosed(msg, func() { 534 conn.calls.handleSendError(msg, ErrClosed) 535 canceler() 536 }) 537 } else { 538 canceler() 539 call = &Call{Err: nil, Done: ch} 540 ch <- call 541 conn.sendMessageAndIfClosed(msg, func() { 542 call = &Call{Err: ErrClosed} 543 }) 544 } 545 return call 546 } 547 548 // sendError creates an error message corresponding to the parameters and sends 549 // it to conn.out. 550 func (conn *Conn) sendError(err error, dest string, serial uint32) { 551 var e *Error 552 switch em := err.(type) { 553 case Error: 554 e = &em 555 case *Error: 556 e = em 557 case DBusError: 558 name, body := em.DBusError() 559 e = NewError(name, body) 560 default: 561 e = MakeFailedError(err) 562 } 563 msg := new(Message) 564 msg.Type = TypeError 565 msg.Headers = make(map[HeaderField]Variant) 566 if dest != "" { 567 msg.Headers[FieldDestination] = MakeVariant(dest) 568 } 569 msg.Headers[FieldErrorName] = MakeVariant(e.Name) 570 msg.Headers[FieldReplySerial] = MakeVariant(serial) 571 msg.Body = e.Body 572 if len(e.Body) > 0 { 573 msg.Headers[FieldSignature] = MakeVariant(SignatureOf(e.Body...)) 574 } 575 conn.sendMessageAndIfClosed(msg, nil) 576 } 577 578 // sendReply creates a method reply message corresponding to the parameters and 579 // sends it to conn.out. 580 func (conn *Conn) sendReply(dest string, serial uint32, values ...interface{}) { 581 msg := new(Message) 582 msg.Type = TypeMethodReply 583 msg.Headers = make(map[HeaderField]Variant) 584 if dest != "" { 585 msg.Headers[FieldDestination] = MakeVariant(dest) 586 } 587 msg.Headers[FieldReplySerial] = MakeVariant(serial) 588 msg.Body = values 589 if len(values) > 0 { 590 msg.Headers[FieldSignature] = MakeVariant(SignatureOf(values...)) 591 } 592 conn.sendMessageAndIfClosed(msg, nil) 593 } 594 595 // AddMatchSignal registers the given match rule to receive broadcast 596 // signals based on their contents. 597 func (conn *Conn) AddMatchSignal(options ...MatchOption) error { 598 return conn.AddMatchSignalContext(context.Background(), options...) 599 } 600 601 // AddMatchSignalContext acts like AddMatchSignal but takes a context. 602 func (conn *Conn) AddMatchSignalContext(ctx context.Context, options ...MatchOption) error { 603 options = append([]MatchOption{withMatchType("signal")}, options...) 604 return conn.busObj.CallWithContext( 605 ctx, 606 "org.freedesktop.DBus.AddMatch", 0, 607 formatMatchOptions(options), 608 ).Store() 609 } 610 611 // RemoveMatchSignal removes the first rule that matches previously registered with AddMatchSignal. 612 func (conn *Conn) RemoveMatchSignal(options ...MatchOption) error { 613 return conn.RemoveMatchSignalContext(context.Background(), options...) 614 } 615 616 // RemoveMatchSignalContext acts like RemoveMatchSignal but takes a context. 617 func (conn *Conn) RemoveMatchSignalContext(ctx context.Context, options ...MatchOption) error { 618 options = append([]MatchOption{withMatchType("signal")}, options...) 619 return conn.busObj.CallWithContext( 620 ctx, 621 "org.freedesktop.DBus.RemoveMatch", 0, 622 formatMatchOptions(options), 623 ).Store() 624 } 625 626 // Signal registers the given channel to be passed all received signal messages. 627 // 628 // Multiple of these channels can be registered at the same time. 629 // 630 // These channels are "overwritten" by Eavesdrop; i.e., if there currently is a 631 // channel for eavesdropped messages, this channel receives all signals, and 632 // none of the channels passed to Signal will receive any signals. 633 // 634 // Panics if the signal handler is not a `SignalRegistrar`. 635 func (conn *Conn) Signal(ch chan<- *Signal) { 636 handler, ok := conn.signalHandler.(SignalRegistrar) 637 if !ok { 638 panic("cannot use this method with a non SignalRegistrar handler") 639 } 640 handler.AddSignal(ch) 641 } 642 643 // RemoveSignal removes the given channel from the list of the registered channels. 644 // 645 // Panics if the signal handler is not a `SignalRegistrar`. 646 func (conn *Conn) RemoveSignal(ch chan<- *Signal) { 647 handler, ok := conn.signalHandler.(SignalRegistrar) 648 if !ok { 649 panic("cannot use this method with a non SignalRegistrar handler") 650 } 651 handler.RemoveSignal(ch) 652 } 653 654 // SupportsUnixFDs returns whether the underlying transport supports passing of 655 // unix file descriptors. If this is false, method calls containing unix file 656 // descriptors will return an error and emitted signals containing them will 657 // not be sent. 658 func (conn *Conn) SupportsUnixFDs() bool { 659 return conn.unixFD 660 } 661 662 // Error represents a D-Bus message of type Error. 663 type Error struct { 664 Name string 665 Body []interface{} 666 } 667 668 func NewError(name string, body []interface{}) *Error { 669 return &Error{name, body} 670 } 671 672 func (e Error) Error() string { 673 if len(e.Body) >= 1 { 674 s, ok := e.Body[0].(string) 675 if ok { 676 return s 677 } 678 } 679 return e.Name 680 } 681 682 // Signal represents a D-Bus message of type Signal. The name member is given in 683 // "interface.member" notation, e.g. org.freedesktop.D-Bus.NameLost. 684 type Signal struct { 685 Sender string 686 Path ObjectPath 687 Name string 688 Body []interface{} 689 Sequence Sequence 690 } 691 692 // transport is a D-Bus transport. 693 type transport interface { 694 // Read and Write raw data (for example, for the authentication protocol). 695 io.ReadWriteCloser 696 697 // Send the initial null byte used for the EXTERNAL mechanism. 698 SendNullByte() error 699 700 // Returns whether this transport supports passing Unix FDs. 701 SupportsUnixFDs() bool 702 703 // Signal the transport that Unix FD passing is enabled for this connection. 704 EnableUnixFDs() 705 706 // Read / send a message, handling things like Unix FDs. 707 ReadMessage() (*Message, error) 708 SendMessage(*Message) error 709 } 710 711 var ( 712 transports = make(map[string]func(string) (transport, error)) 713 ) 714 715 func getTransport(address string) (transport, error) { 716 var err error 717 var t transport 718 719 addresses := strings.Split(address, ";") 720 for _, v := range addresses { 721 i := strings.IndexRune(v, ':') 722 if i == -1 { 723 err = errors.New("dbus: invalid bus address (no transport)") 724 continue 725 } 726 f := transports[v[:i]] 727 if f == nil { 728 err = errors.New("dbus: invalid bus address (invalid or unsupported transport)") 729 continue 730 } 731 t, err = f(v[i+1:]) 732 if err == nil { 733 return t, nil 734 } 735 } 736 return nil, err 737 } 738 739 // getKey gets a key from a the list of keys. Returns "" on error / not found... 740 func getKey(s, key string) string { 741 for _, keyEqualsValue := range strings.Split(s, ",") { 742 keyValue := strings.SplitN(keyEqualsValue, "=", 2) 743 if len(keyValue) == 2 && keyValue[0] == key { 744 return keyValue[1] 745 } 746 } 747 return "" 748 } 749 750 type outputHandler struct { 751 conn *Conn 752 sendLck sync.Mutex 753 closed struct { 754 isClosed bool 755 lck sync.RWMutex 756 } 757 } 758 759 func (h *outputHandler) sendAndIfClosed(msg *Message, ifClosed func()) error { 760 h.closed.lck.RLock() 761 defer h.closed.lck.RUnlock() 762 if h.closed.isClosed { 763 if ifClosed != nil { 764 ifClosed() 765 } 766 return nil 767 } 768 h.sendLck.Lock() 769 defer h.sendLck.Unlock() 770 return h.conn.SendMessage(msg) 771 } 772 773 func (h *outputHandler) close() { 774 h.closed.lck.Lock() 775 defer h.closed.lck.Unlock() 776 h.closed.isClosed = true 777 } 778 779 type serialGenerator struct { 780 lck sync.Mutex 781 nextSerial uint32 782 serialUsed map[uint32]bool 783 } 784 785 func newSerialGenerator() *serialGenerator { 786 return &serialGenerator{ 787 serialUsed: map[uint32]bool{0: true}, 788 nextSerial: 1, 789 } 790 } 791 792 func (gen *serialGenerator) GetSerial() uint32 { 793 gen.lck.Lock() 794 defer gen.lck.Unlock() 795 n := gen.nextSerial 796 for gen.serialUsed[n] { 797 n++ 798 } 799 gen.serialUsed[n] = true 800 gen.nextSerial = n + 1 801 return n 802 } 803 804 func (gen *serialGenerator) RetireSerial(serial uint32) { 805 gen.lck.Lock() 806 defer gen.lck.Unlock() 807 delete(gen.serialUsed, serial) 808 } 809 810 type nameTracker struct { 811 lck sync.RWMutex 812 unique string 813 names map[string]struct{} 814 } 815 816 func newNameTracker() *nameTracker { 817 return &nameTracker{names: map[string]struct{}{}} 818 } 819 func (tracker *nameTracker) acquireUniqueConnectionName(name string) { 820 tracker.lck.Lock() 821 defer tracker.lck.Unlock() 822 tracker.unique = name 823 } 824 func (tracker *nameTracker) acquireName(name string) { 825 tracker.lck.Lock() 826 defer tracker.lck.Unlock() 827 tracker.names[name] = struct{}{} 828 } 829 func (tracker *nameTracker) loseName(name string) { 830 tracker.lck.Lock() 831 defer tracker.lck.Unlock() 832 delete(tracker.names, name) 833 } 834 835 func (tracker *nameTracker) uniqueNameIsKnown() bool { 836 tracker.lck.RLock() 837 defer tracker.lck.RUnlock() 838 return tracker.unique != "" 839 } 840 func (tracker *nameTracker) isKnownName(name string) bool { 841 tracker.lck.RLock() 842 defer tracker.lck.RUnlock() 843 _, ok := tracker.names[name] 844 return ok || name == tracker.unique 845 } 846 func (tracker *nameTracker) listKnownNames() []string { 847 tracker.lck.RLock() 848 defer tracker.lck.RUnlock() 849 out := make([]string, 0, len(tracker.names)+1) 850 out = append(out, tracker.unique) 851 for k := range tracker.names { 852 out = append(out, k) 853 } 854 return out 855 } 856 857 type callTracker struct { 858 calls map[uint32]*Call 859 lck sync.RWMutex 860 } 861 862 func newCallTracker() *callTracker { 863 return &callTracker{calls: map[uint32]*Call{}} 864 } 865 866 func (tracker *callTracker) track(sn uint32, call *Call) { 867 tracker.lck.Lock() 868 tracker.calls[sn] = call 869 tracker.lck.Unlock() 870 } 871 872 func (tracker *callTracker) handleReply(sequence Sequence, msg *Message) uint32 { 873 serial := msg.Headers[FieldReplySerial].value.(uint32) 874 tracker.lck.RLock() 875 _, ok := tracker.calls[serial] 876 tracker.lck.RUnlock() 877 if ok { 878 tracker.finalizeWithBody(serial, sequence, msg.Body) 879 } 880 return serial 881 } 882 883 func (tracker *callTracker) handleDBusError(sequence Sequence, msg *Message) uint32 { 884 serial := msg.Headers[FieldReplySerial].value.(uint32) 885 tracker.lck.RLock() 886 _, ok := tracker.calls[serial] 887 tracker.lck.RUnlock() 888 if ok { 889 name, _ := msg.Headers[FieldErrorName].value.(string) 890 tracker.finalizeWithError(serial, sequence, Error{name, msg.Body}) 891 } 892 return serial 893 } 894 895 func (tracker *callTracker) handleSendError(msg *Message, err error) { 896 if err == nil { 897 return 898 } 899 tracker.lck.RLock() 900 _, ok := tracker.calls[msg.serial] 901 tracker.lck.RUnlock() 902 if ok { 903 tracker.finalizeWithError(msg.serial, NoSequence, err) 904 } 905 } 906 907 // finalize was the only func that did not strobe Done 908 func (tracker *callTracker) finalize(sn uint32) { 909 tracker.lck.Lock() 910 defer tracker.lck.Unlock() 911 c, ok := tracker.calls[sn] 912 if ok { 913 delete(tracker.calls, sn) 914 c.ContextCancel() 915 } 916 } 917 918 func (tracker *callTracker) finalizeWithBody(sn uint32, sequence Sequence, body []interface{}) { 919 tracker.lck.Lock() 920 c, ok := tracker.calls[sn] 921 if ok { 922 delete(tracker.calls, sn) 923 } 924 tracker.lck.Unlock() 925 if ok { 926 c.Body = body 927 c.ResponseSequence = sequence 928 c.done() 929 } 930 } 931 932 func (tracker *callTracker) finalizeWithError(sn uint32, sequence Sequence, err error) { 933 tracker.lck.Lock() 934 c, ok := tracker.calls[sn] 935 if ok { 936 delete(tracker.calls, sn) 937 } 938 tracker.lck.Unlock() 939 if ok { 940 c.Err = err 941 c.ResponseSequence = sequence 942 c.done() 943 } 944 } 945 946 func (tracker *callTracker) finalizeAllWithError(sequenceGen *sequenceGenerator, err error) { 947 tracker.lck.Lock() 948 closedCalls := make([]*Call, 0, len(tracker.calls)) 949 for sn := range tracker.calls { 950 closedCalls = append(closedCalls, tracker.calls[sn]) 951 } 952 tracker.calls = map[uint32]*Call{} 953 tracker.lck.Unlock() 954 for _, call := range closedCalls { 955 call.Err = err 956 call.ResponseSequence = sequenceGen.next() 957 call.done() 958 } 959 }