channel.go (16166B)
1 // Copyright 2011 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 package ssh 6 7 import ( 8 "encoding/binary" 9 "errors" 10 "fmt" 11 "io" 12 "log" 13 "sync" 14 ) 15 16 const ( 17 minPacketLength = 9 18 // channelMaxPacket contains the maximum number of bytes that will be 19 // sent in a single packet. As per RFC 4253, section 6.1, 32k is also 20 // the minimum. 21 channelMaxPacket = 1 << 15 22 // We follow OpenSSH here. 23 channelWindowSize = 64 * channelMaxPacket 24 ) 25 26 // NewChannel represents an incoming request to a channel. It must either be 27 // accepted for use by calling Accept, or rejected by calling Reject. 28 type NewChannel interface { 29 // Accept accepts the channel creation request. It returns the Channel 30 // and a Go channel containing SSH requests. The Go channel must be 31 // serviced otherwise the Channel will hang. 32 Accept() (Channel, <-chan *Request, error) 33 34 // Reject rejects the channel creation request. After calling 35 // this, no other methods on the Channel may be called. 36 Reject(reason RejectionReason, message string) error 37 38 // ChannelType returns the type of the channel, as supplied by the 39 // client. 40 ChannelType() string 41 42 // ExtraData returns the arbitrary payload for this channel, as supplied 43 // by the client. This data is specific to the channel type. 44 ExtraData() []byte 45 } 46 47 // A Channel is an ordered, reliable, flow-controlled, duplex stream 48 // that is multiplexed over an SSH connection. 49 type Channel interface { 50 // Read reads up to len(data) bytes from the channel. 51 Read(data []byte) (int, error) 52 53 // Write writes len(data) bytes to the channel. 54 Write(data []byte) (int, error) 55 56 // Close signals end of channel use. No data may be sent after this 57 // call. 58 Close() error 59 60 // CloseWrite signals the end of sending in-band 61 // data. Requests may still be sent, and the other side may 62 // still send data 63 CloseWrite() error 64 65 // SendRequest sends a channel request. If wantReply is true, 66 // it will wait for a reply and return the result as a 67 // boolean, otherwise the return value will be false. Channel 68 // requests are out-of-band messages so they may be sent even 69 // if the data stream is closed or blocked by flow control. 70 // If the channel is closed before a reply is returned, io.EOF 71 // is returned. 72 SendRequest(name string, wantReply bool, payload []byte) (bool, error) 73 74 // Stderr returns an io.ReadWriter that writes to this channel 75 // with the extended data type set to stderr. Stderr may 76 // safely be read and written from a different goroutine than 77 // Read and Write respectively. 78 Stderr() io.ReadWriter 79 } 80 81 // Request is a request sent outside of the normal stream of 82 // data. Requests can either be specific to an SSH channel, or they 83 // can be global. 84 type Request struct { 85 Type string 86 WantReply bool 87 Payload []byte 88 89 ch *channel 90 mux *mux 91 } 92 93 // Reply sends a response to a request. It must be called for all requests 94 // where WantReply is true and is a no-op otherwise. The payload argument is 95 // ignored for replies to channel-specific requests. 96 func (r *Request) Reply(ok bool, payload []byte) error { 97 if !r.WantReply { 98 return nil 99 } 100 101 if r.ch == nil { 102 return r.mux.ackRequest(ok, payload) 103 } 104 105 return r.ch.ackRequest(ok) 106 } 107 108 // RejectionReason is an enumeration used when rejecting channel creation 109 // requests. See RFC 4254, section 5.1. 110 type RejectionReason uint32 111 112 const ( 113 Prohibited RejectionReason = iota + 1 114 ConnectionFailed 115 UnknownChannelType 116 ResourceShortage 117 ) 118 119 // String converts the rejection reason to human readable form. 120 func (r RejectionReason) String() string { 121 switch r { 122 case Prohibited: 123 return "administratively prohibited" 124 case ConnectionFailed: 125 return "connect failed" 126 case UnknownChannelType: 127 return "unknown channel type" 128 case ResourceShortage: 129 return "resource shortage" 130 } 131 return fmt.Sprintf("unknown reason %d", int(r)) 132 } 133 134 func min(a uint32, b int) uint32 { 135 if a < uint32(b) { 136 return a 137 } 138 return uint32(b) 139 } 140 141 type channelDirection uint8 142 143 const ( 144 channelInbound channelDirection = iota 145 channelOutbound 146 ) 147 148 // channel is an implementation of the Channel interface that works 149 // with the mux class. 150 type channel struct { 151 // R/O after creation 152 chanType string 153 extraData []byte 154 localId, remoteId uint32 155 156 // maxIncomingPayload and maxRemotePayload are the maximum 157 // payload sizes of normal and extended data packets for 158 // receiving and sending, respectively. The wire packet will 159 // be 9 or 13 bytes larger (excluding encryption overhead). 160 maxIncomingPayload uint32 161 maxRemotePayload uint32 162 163 mux *mux 164 165 // decided is set to true if an accept or reject message has been sent 166 // (for outbound channels) or received (for inbound channels). 167 decided bool 168 169 // direction contains either channelOutbound, for channels created 170 // locally, or channelInbound, for channels created by the peer. 171 direction channelDirection 172 173 // Pending internal channel messages. 174 msg chan interface{} 175 176 // Since requests have no ID, there can be only one request 177 // with WantReply=true outstanding. This lock is held by a 178 // goroutine that has such an outgoing request pending. 179 sentRequestMu sync.Mutex 180 181 incomingRequests chan *Request 182 183 sentEOF bool 184 185 // thread-safe data 186 remoteWin window 187 pending *buffer 188 extPending *buffer 189 190 // windowMu protects myWindow, the flow-control window. 191 windowMu sync.Mutex 192 myWindow uint32 193 194 // writeMu serializes calls to mux.conn.writePacket() and 195 // protects sentClose and packetPool. This mutex must be 196 // different from windowMu, as writePacket can block if there 197 // is a key exchange pending. 198 writeMu sync.Mutex 199 sentClose bool 200 201 // packetPool has a buffer for each extended channel ID to 202 // save allocations during writes. 203 packetPool map[uint32][]byte 204 } 205 206 // writePacket sends a packet. If the packet is a channel close, it updates 207 // sentClose. This method takes the lock c.writeMu. 208 func (ch *channel) writePacket(packet []byte) error { 209 ch.writeMu.Lock() 210 if ch.sentClose { 211 ch.writeMu.Unlock() 212 return io.EOF 213 } 214 ch.sentClose = (packet[0] == msgChannelClose) 215 err := ch.mux.conn.writePacket(packet) 216 ch.writeMu.Unlock() 217 return err 218 } 219 220 func (ch *channel) sendMessage(msg interface{}) error { 221 if debugMux { 222 log.Printf("send(%d): %#v", ch.mux.chanList.offset, msg) 223 } 224 225 p := Marshal(msg) 226 binary.BigEndian.PutUint32(p[1:], ch.remoteId) 227 return ch.writePacket(p) 228 } 229 230 // WriteExtended writes data to a specific extended stream. These streams are 231 // used, for example, for stderr. 232 func (ch *channel) WriteExtended(data []byte, extendedCode uint32) (n int, err error) { 233 if ch.sentEOF { 234 return 0, io.EOF 235 } 236 // 1 byte message type, 4 bytes remoteId, 4 bytes data length 237 opCode := byte(msgChannelData) 238 headerLength := uint32(9) 239 if extendedCode > 0 { 240 headerLength += 4 241 opCode = msgChannelExtendedData 242 } 243 244 ch.writeMu.Lock() 245 packet := ch.packetPool[extendedCode] 246 // We don't remove the buffer from packetPool, so 247 // WriteExtended calls from different goroutines will be 248 // flagged as errors by the race detector. 249 ch.writeMu.Unlock() 250 251 for len(data) > 0 { 252 space := min(ch.maxRemotePayload, len(data)) 253 if space, err = ch.remoteWin.reserve(space); err != nil { 254 return n, err 255 } 256 if want := headerLength + space; uint32(cap(packet)) < want { 257 packet = make([]byte, want) 258 } else { 259 packet = packet[:want] 260 } 261 262 todo := data[:space] 263 264 packet[0] = opCode 265 binary.BigEndian.PutUint32(packet[1:], ch.remoteId) 266 if extendedCode > 0 { 267 binary.BigEndian.PutUint32(packet[5:], uint32(extendedCode)) 268 } 269 binary.BigEndian.PutUint32(packet[headerLength-4:], uint32(len(todo))) 270 copy(packet[headerLength:], todo) 271 if err = ch.writePacket(packet); err != nil { 272 return n, err 273 } 274 275 n += len(todo) 276 data = data[len(todo):] 277 } 278 279 ch.writeMu.Lock() 280 ch.packetPool[extendedCode] = packet 281 ch.writeMu.Unlock() 282 283 return n, err 284 } 285 286 func (ch *channel) handleData(packet []byte) error { 287 headerLen := 9 288 isExtendedData := packet[0] == msgChannelExtendedData 289 if isExtendedData { 290 headerLen = 13 291 } 292 if len(packet) < headerLen { 293 // malformed data packet 294 return parseError(packet[0]) 295 } 296 297 var extended uint32 298 if isExtendedData { 299 extended = binary.BigEndian.Uint32(packet[5:]) 300 } 301 302 length := binary.BigEndian.Uint32(packet[headerLen-4 : headerLen]) 303 if length == 0 { 304 return nil 305 } 306 if length > ch.maxIncomingPayload { 307 // TODO(hanwen): should send Disconnect? 308 return errors.New("ssh: incoming packet exceeds maximum payload size") 309 } 310 311 data := packet[headerLen:] 312 if length != uint32(len(data)) { 313 return errors.New("ssh: wrong packet length") 314 } 315 316 ch.windowMu.Lock() 317 if ch.myWindow < length { 318 ch.windowMu.Unlock() 319 // TODO(hanwen): should send Disconnect with reason? 320 return errors.New("ssh: remote side wrote too much") 321 } 322 ch.myWindow -= length 323 ch.windowMu.Unlock() 324 325 if extended == 1 { 326 ch.extPending.write(data) 327 } else if extended > 0 { 328 // discard other extended data. 329 } else { 330 ch.pending.write(data) 331 } 332 return nil 333 } 334 335 func (c *channel) adjustWindow(n uint32) error { 336 c.windowMu.Lock() 337 // Since myWindow is managed on our side, and can never exceed 338 // the initial window setting, we don't worry about overflow. 339 c.myWindow += uint32(n) 340 c.windowMu.Unlock() 341 return c.sendMessage(windowAdjustMsg{ 342 AdditionalBytes: uint32(n), 343 }) 344 } 345 346 func (c *channel) ReadExtended(data []byte, extended uint32) (n int, err error) { 347 switch extended { 348 case 1: 349 n, err = c.extPending.Read(data) 350 case 0: 351 n, err = c.pending.Read(data) 352 default: 353 return 0, fmt.Errorf("ssh: extended code %d unimplemented", extended) 354 } 355 356 if n > 0 { 357 err = c.adjustWindow(uint32(n)) 358 // sendWindowAdjust can return io.EOF if the remote 359 // peer has closed the connection, however we want to 360 // defer forwarding io.EOF to the caller of Read until 361 // the buffer has been drained. 362 if n > 0 && err == io.EOF { 363 err = nil 364 } 365 } 366 367 return n, err 368 } 369 370 func (c *channel) close() { 371 c.pending.eof() 372 c.extPending.eof() 373 close(c.msg) 374 close(c.incomingRequests) 375 c.writeMu.Lock() 376 // This is not necessary for a normal channel teardown, but if 377 // there was another error, it is. 378 c.sentClose = true 379 c.writeMu.Unlock() 380 // Unblock writers. 381 c.remoteWin.close() 382 } 383 384 // responseMessageReceived is called when a success or failure message is 385 // received on a channel to check that such a message is reasonable for the 386 // given channel. 387 func (ch *channel) responseMessageReceived() error { 388 if ch.direction == channelInbound { 389 return errors.New("ssh: channel response message received on inbound channel") 390 } 391 if ch.decided { 392 return errors.New("ssh: duplicate response received for channel") 393 } 394 ch.decided = true 395 return nil 396 } 397 398 func (ch *channel) handlePacket(packet []byte) error { 399 switch packet[0] { 400 case msgChannelData, msgChannelExtendedData: 401 return ch.handleData(packet) 402 case msgChannelClose: 403 ch.sendMessage(channelCloseMsg{PeersID: ch.remoteId}) 404 ch.mux.chanList.remove(ch.localId) 405 ch.close() 406 return nil 407 case msgChannelEOF: 408 // RFC 4254 is mute on how EOF affects dataExt messages but 409 // it is logical to signal EOF at the same time. 410 ch.extPending.eof() 411 ch.pending.eof() 412 return nil 413 } 414 415 decoded, err := decode(packet) 416 if err != nil { 417 return err 418 } 419 420 switch msg := decoded.(type) { 421 case *channelOpenFailureMsg: 422 if err := ch.responseMessageReceived(); err != nil { 423 return err 424 } 425 ch.mux.chanList.remove(msg.PeersID) 426 ch.msg <- msg 427 case *channelOpenConfirmMsg: 428 if err := ch.responseMessageReceived(); err != nil { 429 return err 430 } 431 if msg.MaxPacketSize < minPacketLength || msg.MaxPacketSize > 1<<31 { 432 return fmt.Errorf("ssh: invalid MaxPacketSize %d from peer", msg.MaxPacketSize) 433 } 434 ch.remoteId = msg.MyID 435 ch.maxRemotePayload = msg.MaxPacketSize 436 ch.remoteWin.add(msg.MyWindow) 437 ch.msg <- msg 438 case *windowAdjustMsg: 439 if !ch.remoteWin.add(msg.AdditionalBytes) { 440 return fmt.Errorf("ssh: invalid window update for %d bytes", msg.AdditionalBytes) 441 } 442 case *channelRequestMsg: 443 req := Request{ 444 Type: msg.Request, 445 WantReply: msg.WantReply, 446 Payload: msg.RequestSpecificData, 447 ch: ch, 448 } 449 450 ch.incomingRequests <- &req 451 default: 452 ch.msg <- msg 453 } 454 return nil 455 } 456 457 func (m *mux) newChannel(chanType string, direction channelDirection, extraData []byte) *channel { 458 ch := &channel{ 459 remoteWin: window{Cond: newCond()}, 460 myWindow: channelWindowSize, 461 pending: newBuffer(), 462 extPending: newBuffer(), 463 direction: direction, 464 incomingRequests: make(chan *Request, chanSize), 465 msg: make(chan interface{}, chanSize), 466 chanType: chanType, 467 extraData: extraData, 468 mux: m, 469 packetPool: make(map[uint32][]byte), 470 } 471 ch.localId = m.chanList.add(ch) 472 return ch 473 } 474 475 var errUndecided = errors.New("ssh: must Accept or Reject channel") 476 var errDecidedAlready = errors.New("ssh: can call Accept or Reject only once") 477 478 type extChannel struct { 479 code uint32 480 ch *channel 481 } 482 483 func (e *extChannel) Write(data []byte) (n int, err error) { 484 return e.ch.WriteExtended(data, e.code) 485 } 486 487 func (e *extChannel) Read(data []byte) (n int, err error) { 488 return e.ch.ReadExtended(data, e.code) 489 } 490 491 func (ch *channel) Accept() (Channel, <-chan *Request, error) { 492 if ch.decided { 493 return nil, nil, errDecidedAlready 494 } 495 ch.maxIncomingPayload = channelMaxPacket 496 confirm := channelOpenConfirmMsg{ 497 PeersID: ch.remoteId, 498 MyID: ch.localId, 499 MyWindow: ch.myWindow, 500 MaxPacketSize: ch.maxIncomingPayload, 501 } 502 ch.decided = true 503 if err := ch.sendMessage(confirm); err != nil { 504 return nil, nil, err 505 } 506 507 return ch, ch.incomingRequests, nil 508 } 509 510 func (ch *channel) Reject(reason RejectionReason, message string) error { 511 if ch.decided { 512 return errDecidedAlready 513 } 514 reject := channelOpenFailureMsg{ 515 PeersID: ch.remoteId, 516 Reason: reason, 517 Message: message, 518 Language: "en", 519 } 520 ch.decided = true 521 return ch.sendMessage(reject) 522 } 523 524 func (ch *channel) Read(data []byte) (int, error) { 525 if !ch.decided { 526 return 0, errUndecided 527 } 528 return ch.ReadExtended(data, 0) 529 } 530 531 func (ch *channel) Write(data []byte) (int, error) { 532 if !ch.decided { 533 return 0, errUndecided 534 } 535 return ch.WriteExtended(data, 0) 536 } 537 538 func (ch *channel) CloseWrite() error { 539 if !ch.decided { 540 return errUndecided 541 } 542 ch.sentEOF = true 543 return ch.sendMessage(channelEOFMsg{ 544 PeersID: ch.remoteId}) 545 } 546 547 func (ch *channel) Close() error { 548 if !ch.decided { 549 return errUndecided 550 } 551 552 return ch.sendMessage(channelCloseMsg{ 553 PeersID: ch.remoteId}) 554 } 555 556 // Extended returns an io.ReadWriter that sends and receives data on the given, 557 // SSH extended stream. Such streams are used, for example, for stderr. 558 func (ch *channel) Extended(code uint32) io.ReadWriter { 559 if !ch.decided { 560 return nil 561 } 562 return &extChannel{code, ch} 563 } 564 565 func (ch *channel) Stderr() io.ReadWriter { 566 return ch.Extended(1) 567 } 568 569 func (ch *channel) SendRequest(name string, wantReply bool, payload []byte) (bool, error) { 570 if !ch.decided { 571 return false, errUndecided 572 } 573 574 if wantReply { 575 ch.sentRequestMu.Lock() 576 defer ch.sentRequestMu.Unlock() 577 } 578 579 msg := channelRequestMsg{ 580 PeersID: ch.remoteId, 581 Request: name, 582 WantReply: wantReply, 583 RequestSpecificData: payload, 584 } 585 586 if err := ch.sendMessage(msg); err != nil { 587 return false, err 588 } 589 590 if wantReply { 591 m, ok := (<-ch.msg) 592 if !ok { 593 return false, io.EOF 594 } 595 switch m.(type) { 596 case *channelRequestFailureMsg: 597 return false, nil 598 case *channelRequestSuccessMsg: 599 return true, nil 600 default: 601 return false, fmt.Errorf("ssh: unexpected response to channel request: %#v", m) 602 } 603 } 604 605 return false, nil 606 } 607 608 // ackRequest either sends an ack or nack to the channel request. 609 func (ch *channel) ackRequest(ok bool) error { 610 if !ch.decided { 611 return errUndecided 612 } 613 614 var msg interface{} 615 if !ok { 616 msg = channelRequestFailureMsg{ 617 PeersID: ch.remoteId, 618 } 619 } else { 620 msg = channelRequestSuccessMsg{ 621 PeersID: ch.remoteId, 622 } 623 } 624 return ch.sendMessage(msg) 625 } 626 627 func (ch *channel) ChannelType() string { 628 return ch.chanType 629 } 630 631 func (ch *channel) ExtraData() []byte { 632 return ch.extraData 633 }