cgroup.go (13772B)
1 /* 2 Copyright The containerd Authors. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 package cgroup1 18 19 import ( 20 "errors" 21 "fmt" 22 "io/fs" 23 "os" 24 "path/filepath" 25 "strconv" 26 "strings" 27 "sync" 28 "syscall" 29 "time" 30 31 v1 "github.com/containerd/cgroups/v3/cgroup1/stats" 32 33 "github.com/opencontainers/runtime-spec/specs-go" 34 ) 35 36 // New returns a new control via the cgroup cgroups interface 37 func New(path Path, resources *specs.LinuxResources, opts ...InitOpts) (Cgroup, error) { 38 config := newInitConfig() 39 for _, o := range opts { 40 if err := o(config); err != nil { 41 return nil, err 42 } 43 } 44 subsystems, err := config.hiearchy() 45 if err != nil { 46 return nil, err 47 } 48 var active []Subsystem 49 for _, s := range subsystems { 50 // check if subsystem exists 51 if err := initializeSubsystem(s, path, resources); err != nil { 52 if err == ErrControllerNotActive { 53 if config.InitCheck != nil { 54 if skerr := config.InitCheck(s, path, err); skerr != nil { 55 if skerr != ErrIgnoreSubsystem { 56 return nil, skerr 57 } 58 } 59 } 60 continue 61 } 62 return nil, err 63 } 64 active = append(active, s) 65 } 66 return &cgroup{ 67 path: path, 68 subsystems: active, 69 }, nil 70 } 71 72 // Load will load an existing cgroup and allow it to be controlled 73 // All static path should not include `/sys/fs/cgroup/` prefix, it should start with your own cgroups name 74 func Load(path Path, opts ...InitOpts) (Cgroup, error) { 75 config := newInitConfig() 76 for _, o := range opts { 77 if err := o(config); err != nil { 78 return nil, err 79 } 80 } 81 var activeSubsystems []Subsystem 82 subsystems, err := config.hiearchy() 83 if err != nil { 84 return nil, err 85 } 86 // check that the subsystems still exist, and keep only those that actually exist 87 for _, s := range pathers(subsystems) { 88 p, err := path(s.Name()) 89 if err != nil { 90 if errors.Is(err, os.ErrNotExist) { 91 return nil, ErrCgroupDeleted 92 } 93 if err == ErrControllerNotActive { 94 if config.InitCheck != nil { 95 if skerr := config.InitCheck(s, path, err); skerr != nil { 96 if skerr != ErrIgnoreSubsystem { 97 return nil, skerr 98 } 99 } 100 } 101 continue 102 } 103 return nil, err 104 } 105 if _, err := os.Lstat(s.Path(p)); err != nil { 106 if os.IsNotExist(err) { 107 continue 108 } 109 return nil, err 110 } 111 activeSubsystems = append(activeSubsystems, s) 112 } 113 // if we do not have any active systems then the cgroup is deleted 114 if len(activeSubsystems) == 0 { 115 return nil, ErrCgroupDeleted 116 } 117 return &cgroup{ 118 path: path, 119 subsystems: activeSubsystems, 120 }, nil 121 } 122 123 type cgroup struct { 124 path Path 125 126 subsystems []Subsystem 127 mu sync.Mutex 128 err error 129 } 130 131 // New returns a new sub cgroup 132 func (c *cgroup) New(name string, resources *specs.LinuxResources) (Cgroup, error) { 133 c.mu.Lock() 134 defer c.mu.Unlock() 135 if c.err != nil { 136 return nil, c.err 137 } 138 path := subPath(c.path, name) 139 for _, s := range c.subsystems { 140 if err := initializeSubsystem(s, path, resources); err != nil { 141 return nil, err 142 } 143 } 144 return &cgroup{ 145 path: path, 146 subsystems: c.subsystems, 147 }, nil 148 } 149 150 // Subsystems returns all the subsystems that are currently being 151 // consumed by the group 152 func (c *cgroup) Subsystems() []Subsystem { 153 return c.subsystems 154 } 155 156 func (c *cgroup) subsystemsFilter(subsystems ...Name) []Subsystem { 157 if len(subsystems) == 0 { 158 return c.subsystems 159 } 160 161 var filteredSubsystems = []Subsystem{} 162 for _, s := range c.subsystems { 163 for _, f := range subsystems { 164 if s.Name() == f { 165 filteredSubsystems = append(filteredSubsystems, s) 166 break 167 } 168 } 169 } 170 171 return filteredSubsystems 172 } 173 174 // Add moves the provided process into the new cgroup. 175 // Without additional arguments, the process is added to all the cgroup subsystems. 176 // When giving Add a list of subsystem names, the process is only added to those 177 // subsystems, provided that they are active in the targeted cgroup. 178 func (c *cgroup) Add(process Process, subsystems ...Name) error { 179 return c.add(process, cgroupProcs, subsystems...) 180 } 181 182 // AddProc moves the provided process id into the new cgroup. 183 // Without additional arguments, the process with the given id is added to all 184 // the cgroup subsystems. When giving AddProc a list of subsystem names, the process 185 // id is only added to those subsystems, provided that they are active in the targeted 186 // cgroup. 187 func (c *cgroup) AddProc(pid uint64, subsystems ...Name) error { 188 return c.add(Process{Pid: int(pid)}, cgroupProcs, subsystems...) 189 } 190 191 // AddTask moves the provided tasks (threads) into the new cgroup. 192 // Without additional arguments, the task is added to all the cgroup subsystems. 193 // When giving AddTask a list of subsystem names, the task is only added to those 194 // subsystems, provided that they are active in the targeted cgroup. 195 func (c *cgroup) AddTask(process Process, subsystems ...Name) error { 196 return c.add(process, cgroupTasks, subsystems...) 197 } 198 199 // writeCgroupsProcs writes to the file, but retries on EINVAL. 200 func writeCgroupProcs(path string, content []byte, perm fs.FileMode) error { 201 f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, perm) 202 if err != nil { 203 return err 204 } 205 defer f.Close() 206 207 for i := 0; i < 5; i++ { 208 _, err = f.Write(content) 209 if err == nil { 210 return nil 211 } 212 // If the process's associated task's state is TASK_NEW, the kernel 213 // returns EINVAL. The function will retry on the error like runc. 214 // https://github.com/torvalds/linux/blob/v6.0/kernel/sched/core.c#L10308-L10337 215 // https://github.com/opencontainers/runc/pull/1950 216 if !errors.Is(err, syscall.EINVAL) { 217 return err 218 } 219 time.Sleep(30 * time.Millisecond) 220 } 221 return err 222 } 223 224 func (c *cgroup) add(process Process, pType procType, subsystems ...Name) error { 225 if process.Pid <= 0 { 226 return ErrInvalidPid 227 } 228 c.mu.Lock() 229 defer c.mu.Unlock() 230 if c.err != nil { 231 return c.err 232 } 233 for _, s := range pathers(c.subsystemsFilter(subsystems...)) { 234 p, err := c.path(s.Name()) 235 if err != nil { 236 return err 237 } 238 err = writeCgroupProcs( 239 filepath.Join(s.Path(p), pType), 240 []byte(strconv.Itoa(process.Pid)), 241 defaultFilePerm, 242 ) 243 if err != nil { 244 return err 245 } 246 } 247 return nil 248 } 249 250 // Delete will remove the control group from each of the subsystems registered 251 func (c *cgroup) Delete() error { 252 c.mu.Lock() 253 defer c.mu.Unlock() 254 if c.err != nil { 255 return c.err 256 } 257 var errs []string 258 for _, s := range c.subsystems { 259 // kernel prevents cgroups with running process from being removed, check the tree is empty 260 procs, err := c.processes(s.Name(), true, cgroupProcs) 261 if err != nil { 262 return err 263 } 264 if len(procs) > 0 { 265 errs = append(errs, fmt.Sprintf("%s (contains running processes)", string(s.Name()))) 266 continue 267 } 268 if d, ok := s.(deleter); ok { 269 sp, err := c.path(s.Name()) 270 if err != nil { 271 return err 272 } 273 if err := d.Delete(sp); err != nil { 274 errs = append(errs, string(s.Name())) 275 } 276 continue 277 } 278 if p, ok := s.(pather); ok { 279 sp, err := c.path(s.Name()) 280 if err != nil { 281 return err 282 } 283 path := p.Path(sp) 284 if err := remove(path); err != nil { 285 errs = append(errs, path) 286 } 287 continue 288 } 289 } 290 if len(errs) > 0 { 291 return fmt.Errorf("cgroups: unable to remove paths %s", strings.Join(errs, ", ")) 292 } 293 c.err = ErrCgroupDeleted 294 return nil 295 } 296 297 // Stat returns the current metrics for the cgroup 298 func (c *cgroup) Stat(handlers ...ErrorHandler) (*v1.Metrics, error) { 299 c.mu.Lock() 300 defer c.mu.Unlock() 301 if c.err != nil { 302 return nil, c.err 303 } 304 if len(handlers) == 0 { 305 handlers = append(handlers, errPassthrough) 306 } 307 var ( 308 stats = &v1.Metrics{ 309 CPU: &v1.CPUStat{ 310 Throttling: &v1.Throttle{}, 311 Usage: &v1.CPUUsage{}, 312 }, 313 } 314 wg = &sync.WaitGroup{} 315 errs = make(chan error, len(c.subsystems)) 316 ) 317 for _, s := range c.subsystems { 318 if ss, ok := s.(stater); ok { 319 sp, err := c.path(s.Name()) 320 if err != nil { 321 return nil, err 322 } 323 wg.Add(1) 324 go func() { 325 defer wg.Done() 326 if err := ss.Stat(sp, stats); err != nil { 327 for _, eh := range handlers { 328 if herr := eh(err); herr != nil { 329 errs <- herr 330 } 331 } 332 } 333 }() 334 } 335 } 336 wg.Wait() 337 close(errs) 338 for err := range errs { 339 return nil, err 340 } 341 return stats, nil 342 } 343 344 // Update updates the cgroup with the new resource values provided 345 // 346 // Be prepared to handle EBUSY when trying to update a cgroup with 347 // live processes and other operations like Stats being performed at the 348 // same time 349 func (c *cgroup) Update(resources *specs.LinuxResources) error { 350 c.mu.Lock() 351 defer c.mu.Unlock() 352 if c.err != nil { 353 return c.err 354 } 355 for _, s := range c.subsystems { 356 if u, ok := s.(updater); ok { 357 sp, err := c.path(s.Name()) 358 if err != nil { 359 return err 360 } 361 if err := u.Update(sp, resources); err != nil { 362 return err 363 } 364 } 365 } 366 return nil 367 } 368 369 // Processes returns the processes running inside the cgroup along 370 // with the subsystem used, pid, and path 371 func (c *cgroup) Processes(subsystem Name, recursive bool) ([]Process, error) { 372 c.mu.Lock() 373 defer c.mu.Unlock() 374 if c.err != nil { 375 return nil, c.err 376 } 377 return c.processes(subsystem, recursive, cgroupProcs) 378 } 379 380 // Tasks returns the tasks running inside the cgroup along 381 // with the subsystem used, pid, and path 382 func (c *cgroup) Tasks(subsystem Name, recursive bool) ([]Task, error) { 383 c.mu.Lock() 384 defer c.mu.Unlock() 385 if c.err != nil { 386 return nil, c.err 387 } 388 return c.processes(subsystem, recursive, cgroupTasks) 389 } 390 391 func (c *cgroup) processes(subsystem Name, recursive bool, pType procType) ([]Process, error) { 392 s := c.getSubsystem(subsystem) 393 sp, err := c.path(subsystem) 394 if err != nil { 395 return nil, err 396 } 397 if s == nil { 398 return nil, fmt.Errorf("cgroups: %s doesn't exist in %s subsystem", sp, subsystem) 399 } 400 path := s.(pather).Path(sp) 401 var processes []Process 402 err = filepath.Walk(path, func(p string, info os.FileInfo, err error) error { 403 if err != nil { 404 return err 405 } 406 if !recursive && info.IsDir() { 407 if p == path { 408 return nil 409 } 410 return filepath.SkipDir 411 } 412 dir, name := filepath.Split(p) 413 if name != pType { 414 return nil 415 } 416 procs, err := readPids(dir, subsystem, pType) 417 if err != nil { 418 return err 419 } 420 processes = append(processes, procs...) 421 return nil 422 }) 423 return processes, err 424 } 425 426 // Freeze freezes the entire cgroup and all the processes inside it 427 func (c *cgroup) Freeze() error { 428 c.mu.Lock() 429 defer c.mu.Unlock() 430 if c.err != nil { 431 return c.err 432 } 433 s := c.getSubsystem(Freezer) 434 if s == nil { 435 return ErrFreezerNotSupported 436 } 437 sp, err := c.path(Freezer) 438 if err != nil { 439 return err 440 } 441 return s.(*freezerController).Freeze(sp) 442 } 443 444 // Thaw thaws out the cgroup and all the processes inside it 445 func (c *cgroup) Thaw() error { 446 c.mu.Lock() 447 defer c.mu.Unlock() 448 if c.err != nil { 449 return c.err 450 } 451 s := c.getSubsystem(Freezer) 452 if s == nil { 453 return ErrFreezerNotSupported 454 } 455 sp, err := c.path(Freezer) 456 if err != nil { 457 return err 458 } 459 return s.(*freezerController).Thaw(sp) 460 } 461 462 // OOMEventFD returns the memory cgroup's out of memory event fd that triggers 463 // when processes inside the cgroup receive an oom event. Returns 464 // ErrMemoryNotSupported if memory cgroups is not supported. 465 func (c *cgroup) OOMEventFD() (uintptr, error) { 466 c.mu.Lock() 467 defer c.mu.Unlock() 468 if c.err != nil { 469 return 0, c.err 470 } 471 s := c.getSubsystem(Memory) 472 if s == nil { 473 return 0, ErrMemoryNotSupported 474 } 475 sp, err := c.path(Memory) 476 if err != nil { 477 return 0, err 478 } 479 return s.(*memoryController).memoryEvent(sp, OOMEvent()) 480 } 481 482 // RegisterMemoryEvent allows the ability to register for all v1 memory cgroups 483 // notifications. 484 func (c *cgroup) RegisterMemoryEvent(event MemoryEvent) (uintptr, error) { 485 c.mu.Lock() 486 defer c.mu.Unlock() 487 if c.err != nil { 488 return 0, c.err 489 } 490 s := c.getSubsystem(Memory) 491 if s == nil { 492 return 0, ErrMemoryNotSupported 493 } 494 sp, err := c.path(Memory) 495 if err != nil { 496 return 0, err 497 } 498 return s.(*memoryController).memoryEvent(sp, event) 499 } 500 501 // State returns the state of the cgroup and its processes 502 func (c *cgroup) State() State { 503 c.mu.Lock() 504 defer c.mu.Unlock() 505 c.checkExists() 506 if c.err != nil && c.err == ErrCgroupDeleted { 507 return Deleted 508 } 509 s := c.getSubsystem(Freezer) 510 if s == nil { 511 return Thawed 512 } 513 sp, err := c.path(Freezer) 514 if err != nil { 515 return Unknown 516 } 517 state, err := s.(*freezerController).state(sp) 518 if err != nil { 519 return Unknown 520 } 521 return state 522 } 523 524 // MoveTo does a recursive move subsystem by subsystem of all the processes 525 // inside the group 526 func (c *cgroup) MoveTo(destination Cgroup) error { 527 c.mu.Lock() 528 defer c.mu.Unlock() 529 if c.err != nil { 530 return c.err 531 } 532 for _, s := range c.subsystems { 533 processes, err := c.processes(s.Name(), true, cgroupProcs) 534 if err != nil { 535 return err 536 } 537 for _, p := range processes { 538 if err := destination.Add(p); err != nil { 539 if strings.Contains(err.Error(), "no such process") { 540 continue 541 } 542 return err 543 } 544 } 545 } 546 return nil 547 } 548 549 func (c *cgroup) getSubsystem(n Name) Subsystem { 550 for _, s := range c.subsystems { 551 if s.Name() == n { 552 return s 553 } 554 } 555 return nil 556 } 557 558 func (c *cgroup) checkExists() { 559 for _, s := range pathers(c.subsystems) { 560 p, err := c.path(s.Name()) 561 if err != nil { 562 return 563 } 564 if _, err := os.Lstat(s.Path(p)); err != nil { 565 if os.IsNotExist(err) { 566 c.err = ErrCgroupDeleted 567 return 568 } 569 } 570 } 571 }