md5-server_amd64.go (9902B)
1 //+build !noasm,!appengine,gc 2 3 // Copyright (c) 2020 MinIO Inc. All rights reserved. 4 // Use of this source code is governed by a license that can be 5 // found in the LICENSE file. 6 7 package md5simd 8 9 import ( 10 "encoding/binary" 11 "fmt" 12 "runtime" 13 "sync" 14 15 "github.com/klauspost/cpuid/v2" 16 ) 17 18 // MD5 initialization constants 19 const ( 20 // Lanes is the number of concurrently calculated hashes. 21 Lanes = 16 22 23 init0 = 0x67452301 24 init1 = 0xefcdab89 25 init2 = 0x98badcfe 26 init3 = 0x10325476 27 28 // Use scalar routine when below this many lanes 29 useScalarBelow = 3 30 ) 31 32 // md5ServerUID - Does not start at 0 but next multiple of 16 so as to be able to 33 // differentiate with default initialisation value of 0 34 const md5ServerUID = Lanes 35 36 const buffersPerLane = 3 37 38 // Message to send across input channel 39 type blockInput struct { 40 uid uint64 41 msg []byte 42 sumCh chan sumResult 43 reset bool 44 } 45 46 type sumResult struct { 47 digest [Size]byte 48 } 49 50 type lanesInfo [Lanes]blockInput 51 52 // md5Server - Type to implement parallel handling of MD5 invocations 53 type md5Server struct { 54 uidCounter uint64 55 cycle chan uint64 // client with uid has update. 56 newInput chan newClient // Add new client. 57 digests map[uint64][Size]byte // Map of uids to (interim) digest results 58 maskRounds16 [16]maskRounds // Pre-allocated static array for max 16 rounds 59 maskRounds8a [8]maskRounds // Pre-allocated static array for max 8 rounds (1st AVX2 core) 60 maskRounds8b [8]maskRounds // Pre-allocated static array for max 8 rounds (2nd AVX2 core) 61 allBufs []byte // Preallocated buffer. 62 buffers chan []byte // Preallocated buffers, sliced from allBufs. 63 64 i8 [2][8][]byte // avx2 temporary vars 65 d8a, d8b digest8 66 wg sync.WaitGroup 67 } 68 69 // NewServer - Create new object for parallel processing handling 70 func NewServer() Server { 71 if !cpuid.CPU.Supports(cpuid.AVX2) { 72 return &fallbackServer{} 73 } 74 md5srv := &md5Server{} 75 md5srv.digests = make(map[uint64][Size]byte) 76 md5srv.newInput = make(chan newClient, Lanes) 77 md5srv.cycle = make(chan uint64, Lanes*10) 78 md5srv.uidCounter = md5ServerUID - 1 79 md5srv.allBufs = make([]byte, 32+buffersPerLane*Lanes*internalBlockSize) 80 md5srv.buffers = make(chan []byte, buffersPerLane*Lanes) 81 // Fill buffers. 82 for i := 0; i < buffersPerLane*Lanes; i++ { 83 s := 32 + i*internalBlockSize 84 md5srv.buffers <- md5srv.allBufs[s : s+internalBlockSize : s+internalBlockSize] 85 } 86 87 // Start a single thread for reading from the input channel 88 go md5srv.process(md5srv.newInput) 89 return md5srv 90 } 91 92 type newClient struct { 93 uid uint64 94 input chan blockInput 95 } 96 97 // process - Sole handler for reading from the input channel. 98 func (s *md5Server) process(newClients chan newClient) { 99 // To fill up as many lanes as possible: 100 // 101 // 1. Wait for a cycle id. 102 // 2. If not already in a lane, add, otherwise leave on channel 103 // 3. Start timer 104 // 4. Check if lanes is full, if so, goto 10 (process). 105 // 5. If timeout, goto 10. 106 // 6. Wait for new id (goto 2) or timeout (goto 10). 107 // 10. Process. 108 // 11. Check all input if there is already input, if so add to lanes. 109 // 12. Goto 1 110 111 // lanes contains the lanes. 112 var lanes lanesInfo 113 // lanesFilled contains the number of filled lanes for current cycle. 114 var lanesFilled int 115 // clients contains active clients 116 var clients = make(map[uint64]chan blockInput, Lanes) 117 118 addToLane := func(uid uint64) { 119 cl, ok := clients[uid] 120 if !ok { 121 // Unknown client. Maybe it was already removed. 122 return 123 } 124 // Check if we already have it. 125 for _, lane := range lanes[:lanesFilled] { 126 if lane.uid == uid { 127 return 128 } 129 } 130 // Continue until we get a block or there is nothing on channel 131 for { 132 select { 133 case block, ok := <-cl: 134 if !ok { 135 // Client disconnected 136 delete(clients, block.uid) 137 return 138 } 139 if block.uid != uid { 140 panic(fmt.Errorf("uid mismatch, %d (block) != %d (client)", block.uid, uid)) 141 } 142 // If reset message, reset and we're done 143 if block.reset { 144 delete(s.digests, uid) 145 continue 146 } 147 148 // If requesting sum, we will need to maintain state. 149 if block.sumCh != nil { 150 var dig digest 151 d, ok := s.digests[uid] 152 if ok { 153 dig.s[0] = binary.LittleEndian.Uint32(d[0:4]) 154 dig.s[1] = binary.LittleEndian.Uint32(d[4:8]) 155 dig.s[2] = binary.LittleEndian.Uint32(d[8:12]) 156 dig.s[3] = binary.LittleEndian.Uint32(d[12:16]) 157 } else { 158 dig.s[0], dig.s[1], dig.s[2], dig.s[3] = init0, init1, init2, init3 159 } 160 161 sum := sumResult{} 162 // Add end block to current digest. 163 blockScalar(&dig.s, block.msg) 164 165 binary.LittleEndian.PutUint32(sum.digest[0:], dig.s[0]) 166 binary.LittleEndian.PutUint32(sum.digest[4:], dig.s[1]) 167 binary.LittleEndian.PutUint32(sum.digest[8:], dig.s[2]) 168 binary.LittleEndian.PutUint32(sum.digest[12:], dig.s[3]) 169 block.sumCh <- sum 170 if block.msg != nil { 171 s.buffers <- block.msg 172 } 173 continue 174 } 175 if len(block.msg) == 0 { 176 continue 177 } 178 lanes[lanesFilled] = block 179 lanesFilled++ 180 return 181 default: 182 return 183 } 184 } 185 } 186 addNewClient := func(cl newClient) { 187 if _, ok := clients[cl.uid]; ok { 188 panic("internal error: duplicate client registration") 189 } 190 clients[cl.uid] = cl.input 191 } 192 193 allLanesFilled := func() bool { 194 return lanesFilled == Lanes || lanesFilled >= len(clients) 195 } 196 197 for { 198 // Step 1. 199 for lanesFilled == 0 { 200 select { 201 case cl, ok := <-newClients: 202 if !ok { 203 return 204 } 205 addNewClient(cl) 206 // Check if it already sent a payload. 207 addToLane(cl.uid) 208 continue 209 case uid := <-s.cycle: 210 addToLane(uid) 211 } 212 } 213 214 fillLanes: 215 for !allLanesFilled() { 216 select { 217 case cl, ok := <-newClients: 218 if !ok { 219 return 220 } 221 addNewClient(cl) 222 223 case uid := <-s.cycle: 224 addToLane(uid) 225 default: 226 // Nothing more queued... 227 break fillLanes 228 } 229 } 230 231 // If we did not fill all lanes, check if there is more waiting 232 if !allLanesFilled() { 233 runtime.Gosched() 234 for uid := range clients { 235 addToLane(uid) 236 if allLanesFilled() { 237 break 238 } 239 } 240 } 241 if false { 242 if !allLanesFilled() { 243 fmt.Println("Not all lanes filled", lanesFilled, "of", len(clients)) 244 //pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) 245 } else if true { 246 fmt.Println("all lanes filled") 247 } 248 } 249 // Process the lanes we could collect 250 s.blocks(lanes[:lanesFilled]) 251 252 // Clear lanes... 253 lanesFilled = 0 254 // Add all current queued 255 for uid := range clients { 256 addToLane(uid) 257 if allLanesFilled() { 258 break 259 } 260 } 261 } 262 } 263 264 func (s *md5Server) Close() { 265 if s.newInput != nil { 266 close(s.newInput) 267 s.newInput = nil 268 } 269 } 270 271 // Invoke assembly and send results back 272 func (s *md5Server) blocks(lanes []blockInput) { 273 if len(lanes) < useScalarBelow { 274 // Use scalar routine when below this many lanes 275 switch len(lanes) { 276 case 0: 277 case 1: 278 lane := lanes[0] 279 var d digest 280 a, ok := s.digests[lane.uid] 281 if ok { 282 d.s[0] = binary.LittleEndian.Uint32(a[0:4]) 283 d.s[1] = binary.LittleEndian.Uint32(a[4:8]) 284 d.s[2] = binary.LittleEndian.Uint32(a[8:12]) 285 d.s[3] = binary.LittleEndian.Uint32(a[12:16]) 286 } else { 287 d.s[0] = init0 288 d.s[1] = init1 289 d.s[2] = init2 290 d.s[3] = init3 291 } 292 if len(lane.msg) > 0 { 293 // Update... 294 blockScalar(&d.s, lane.msg) 295 } 296 dig := [Size]byte{} 297 binary.LittleEndian.PutUint32(dig[0:], d.s[0]) 298 binary.LittleEndian.PutUint32(dig[4:], d.s[1]) 299 binary.LittleEndian.PutUint32(dig[8:], d.s[2]) 300 binary.LittleEndian.PutUint32(dig[12:], d.s[3]) 301 s.digests[lane.uid] = dig 302 303 if lane.msg != nil { 304 s.buffers <- lane.msg 305 } 306 lanes[0] = blockInput{} 307 308 default: 309 s.wg.Add(len(lanes)) 310 var results [useScalarBelow]digest 311 for i := range lanes { 312 lane := lanes[i] 313 go func(i int) { 314 var d digest 315 defer s.wg.Done() 316 a, ok := s.digests[lane.uid] 317 if ok { 318 d.s[0] = binary.LittleEndian.Uint32(a[0:4]) 319 d.s[1] = binary.LittleEndian.Uint32(a[4:8]) 320 d.s[2] = binary.LittleEndian.Uint32(a[8:12]) 321 d.s[3] = binary.LittleEndian.Uint32(a[12:16]) 322 } else { 323 d.s[0] = init0 324 d.s[1] = init1 325 d.s[2] = init2 326 d.s[3] = init3 327 } 328 if len(lane.msg) == 0 { 329 results[i] = d 330 return 331 } 332 // Update... 333 blockScalar(&d.s, lane.msg) 334 results[i] = d 335 }(i) 336 } 337 s.wg.Wait() 338 for i, lane := range lanes { 339 dig := [Size]byte{} 340 binary.LittleEndian.PutUint32(dig[0:], results[i].s[0]) 341 binary.LittleEndian.PutUint32(dig[4:], results[i].s[1]) 342 binary.LittleEndian.PutUint32(dig[8:], results[i].s[2]) 343 binary.LittleEndian.PutUint32(dig[12:], results[i].s[3]) 344 s.digests[lane.uid] = dig 345 346 if lane.msg != nil { 347 s.buffers <- lane.msg 348 } 349 lanes[i] = blockInput{} 350 } 351 } 352 return 353 } 354 355 inputs := [16][]byte{} 356 for i := range lanes { 357 inputs[i] = lanes[i].msg 358 } 359 360 // Collect active digests... 361 state := s.getDigests(lanes) 362 // Process all lanes... 363 s.blockMd5_x16(&state, inputs, len(lanes) <= 8) 364 365 for i, lane := range lanes { 366 uid := lane.uid 367 dig := [Size]byte{} 368 binary.LittleEndian.PutUint32(dig[0:], state.v0[i]) 369 binary.LittleEndian.PutUint32(dig[4:], state.v1[i]) 370 binary.LittleEndian.PutUint32(dig[8:], state.v2[i]) 371 binary.LittleEndian.PutUint32(dig[12:], state.v3[i]) 372 373 s.digests[uid] = dig 374 if lane.msg != nil { 375 s.buffers <- lane.msg 376 } 377 lanes[i] = blockInput{} 378 } 379 } 380 381 func (s *md5Server) getDigests(lanes []blockInput) (d digest16) { 382 for i, lane := range lanes { 383 a, ok := s.digests[lane.uid] 384 if ok { 385 d.v0[i] = binary.LittleEndian.Uint32(a[0:4]) 386 d.v1[i] = binary.LittleEndian.Uint32(a[4:8]) 387 d.v2[i] = binary.LittleEndian.Uint32(a[8:12]) 388 d.v3[i] = binary.LittleEndian.Uint32(a[12:16]) 389 } else { 390 d.v0[i] = init0 391 d.v1[i] = init1 392 d.v2[i] = init2 393 d.v3[i] = init3 394 } 395 } 396 return 397 }