gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

md5-server_amd64.go (9902B)


      1 //+build !noasm,!appengine,gc
      2 
      3 // Copyright (c) 2020 MinIO Inc. All rights reserved.
      4 // Use of this source code is governed by a license that can be
      5 // found in the LICENSE file.
      6 
      7 package md5simd
      8 
      9 import (
     10 	"encoding/binary"
     11 	"fmt"
     12 	"runtime"
     13 	"sync"
     14 
     15 	"github.com/klauspost/cpuid/v2"
     16 )
     17 
     18 // MD5 initialization constants
     19 const (
     20 	// Lanes is the number of concurrently calculated hashes.
     21 	Lanes = 16
     22 
     23 	init0 = 0x67452301
     24 	init1 = 0xefcdab89
     25 	init2 = 0x98badcfe
     26 	init3 = 0x10325476
     27 
     28 	// Use scalar routine when below this many lanes
     29 	useScalarBelow = 3
     30 )
     31 
     32 // md5ServerUID - Does not start at 0 but next multiple of 16 so as to be able to
     33 // differentiate with default initialisation value of 0
     34 const md5ServerUID = Lanes
     35 
     36 const buffersPerLane = 3
     37 
     38 // Message to send across input channel
     39 type blockInput struct {
     40 	uid   uint64
     41 	msg   []byte
     42 	sumCh chan sumResult
     43 	reset bool
     44 }
     45 
     46 type sumResult struct {
     47 	digest [Size]byte
     48 }
     49 
     50 type lanesInfo [Lanes]blockInput
     51 
     52 // md5Server - Type to implement parallel handling of MD5 invocations
     53 type md5Server struct {
     54 	uidCounter   uint64
     55 	cycle        chan uint64           // client with uid has update.
     56 	newInput     chan newClient        // Add new client.
     57 	digests      map[uint64][Size]byte // Map of uids to (interim) digest results
     58 	maskRounds16 [16]maskRounds        // Pre-allocated static array for max 16 rounds
     59 	maskRounds8a [8]maskRounds         // Pre-allocated static array for max 8 rounds (1st AVX2 core)
     60 	maskRounds8b [8]maskRounds         // Pre-allocated static array for max 8 rounds (2nd AVX2 core)
     61 	allBufs      []byte                // Preallocated buffer.
     62 	buffers      chan []byte           // Preallocated buffers, sliced from allBufs.
     63 
     64 	i8       [2][8][]byte // avx2 temporary vars
     65 	d8a, d8b digest8
     66 	wg       sync.WaitGroup
     67 }
     68 
     69 // NewServer - Create new object for parallel processing handling
     70 func NewServer() Server {
     71 	if !cpuid.CPU.Supports(cpuid.AVX2) {
     72 		return &fallbackServer{}
     73 	}
     74 	md5srv := &md5Server{}
     75 	md5srv.digests = make(map[uint64][Size]byte)
     76 	md5srv.newInput = make(chan newClient, Lanes)
     77 	md5srv.cycle = make(chan uint64, Lanes*10)
     78 	md5srv.uidCounter = md5ServerUID - 1
     79 	md5srv.allBufs = make([]byte, 32+buffersPerLane*Lanes*internalBlockSize)
     80 	md5srv.buffers = make(chan []byte, buffersPerLane*Lanes)
     81 	// Fill buffers.
     82 	for i := 0; i < buffersPerLane*Lanes; i++ {
     83 		s := 32 + i*internalBlockSize
     84 		md5srv.buffers <- md5srv.allBufs[s : s+internalBlockSize : s+internalBlockSize]
     85 	}
     86 
     87 	// Start a single thread for reading from the input channel
     88 	go md5srv.process(md5srv.newInput)
     89 	return md5srv
     90 }
     91 
     92 type newClient struct {
     93 	uid   uint64
     94 	input chan blockInput
     95 }
     96 
     97 // process - Sole handler for reading from the input channel.
     98 func (s *md5Server) process(newClients chan newClient) {
     99 	// To fill up as many lanes as possible:
    100 	//
    101 	// 1. Wait for a cycle id.
    102 	// 2. If not already in a lane, add, otherwise leave on channel
    103 	// 3. Start timer
    104 	// 4. Check if lanes is full, if so, goto 10 (process).
    105 	// 5. If timeout, goto 10.
    106 	// 6. Wait for new id (goto 2)  or timeout (goto 10).
    107 	// 10. Process.
    108 	// 11. Check all input if there is already input, if so add to lanes.
    109 	// 12. Goto 1
    110 
    111 	// lanes contains the lanes.
    112 	var lanes lanesInfo
    113 	// lanesFilled contains the number of filled lanes for current cycle.
    114 	var lanesFilled int
    115 	// clients contains active clients
    116 	var clients = make(map[uint64]chan blockInput, Lanes)
    117 
    118 	addToLane := func(uid uint64) {
    119 		cl, ok := clients[uid]
    120 		if !ok {
    121 			// Unknown client. Maybe it was already removed.
    122 			return
    123 		}
    124 		// Check if we already have it.
    125 		for _, lane := range lanes[:lanesFilled] {
    126 			if lane.uid == uid {
    127 				return
    128 			}
    129 		}
    130 		// Continue until we get a block or there is nothing on channel
    131 		for {
    132 			select {
    133 			case block, ok := <-cl:
    134 				if !ok {
    135 					// Client disconnected
    136 					delete(clients, block.uid)
    137 					return
    138 				}
    139 				if block.uid != uid {
    140 					panic(fmt.Errorf("uid mismatch, %d (block) != %d (client)", block.uid, uid))
    141 				}
    142 				// If reset message, reset and we're done
    143 				if block.reset {
    144 					delete(s.digests, uid)
    145 					continue
    146 				}
    147 
    148 				// If requesting sum, we will need to maintain state.
    149 				if block.sumCh != nil {
    150 					var dig digest
    151 					d, ok := s.digests[uid]
    152 					if ok {
    153 						dig.s[0] = binary.LittleEndian.Uint32(d[0:4])
    154 						dig.s[1] = binary.LittleEndian.Uint32(d[4:8])
    155 						dig.s[2] = binary.LittleEndian.Uint32(d[8:12])
    156 						dig.s[3] = binary.LittleEndian.Uint32(d[12:16])
    157 					} else {
    158 						dig.s[0], dig.s[1], dig.s[2], dig.s[3] = init0, init1, init2, init3
    159 					}
    160 
    161 					sum := sumResult{}
    162 					// Add end block to current digest.
    163 					blockScalar(&dig.s, block.msg)
    164 
    165 					binary.LittleEndian.PutUint32(sum.digest[0:], dig.s[0])
    166 					binary.LittleEndian.PutUint32(sum.digest[4:], dig.s[1])
    167 					binary.LittleEndian.PutUint32(sum.digest[8:], dig.s[2])
    168 					binary.LittleEndian.PutUint32(sum.digest[12:], dig.s[3])
    169 					block.sumCh <- sum
    170 					if block.msg != nil {
    171 						s.buffers <- block.msg
    172 					}
    173 					continue
    174 				}
    175 				if len(block.msg) == 0 {
    176 					continue
    177 				}
    178 				lanes[lanesFilled] = block
    179 				lanesFilled++
    180 				return
    181 			default:
    182 				return
    183 			}
    184 		}
    185 	}
    186 	addNewClient := func(cl newClient) {
    187 		if _, ok := clients[cl.uid]; ok {
    188 			panic("internal error: duplicate client registration")
    189 		}
    190 		clients[cl.uid] = cl.input
    191 	}
    192 
    193 	allLanesFilled := func() bool {
    194 		return lanesFilled == Lanes || lanesFilled >= len(clients)
    195 	}
    196 
    197 	for {
    198 		// Step 1.
    199 		for lanesFilled == 0 {
    200 			select {
    201 			case cl, ok := <-newClients:
    202 				if !ok {
    203 					return
    204 				}
    205 				addNewClient(cl)
    206 				// Check if it already sent a payload.
    207 				addToLane(cl.uid)
    208 				continue
    209 			case uid := <-s.cycle:
    210 				addToLane(uid)
    211 			}
    212 		}
    213 
    214 	fillLanes:
    215 		for !allLanesFilled() {
    216 			select {
    217 			case cl, ok := <-newClients:
    218 				if !ok {
    219 					return
    220 				}
    221 				addNewClient(cl)
    222 
    223 			case uid := <-s.cycle:
    224 				addToLane(uid)
    225 			default:
    226 				// Nothing more queued...
    227 				break fillLanes
    228 			}
    229 		}
    230 
    231 		// If we did not fill all lanes, check if there is more waiting
    232 		if !allLanesFilled() {
    233 			runtime.Gosched()
    234 			for uid := range clients {
    235 				addToLane(uid)
    236 				if allLanesFilled() {
    237 					break
    238 				}
    239 			}
    240 		}
    241 		if false {
    242 			if !allLanesFilled() {
    243 				fmt.Println("Not all lanes filled", lanesFilled, "of", len(clients))
    244 				//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
    245 			} else if true {
    246 				fmt.Println("all lanes filled")
    247 			}
    248 		}
    249 		// Process the lanes we could collect
    250 		s.blocks(lanes[:lanesFilled])
    251 
    252 		// Clear lanes...
    253 		lanesFilled = 0
    254 		// Add all current queued
    255 		for uid := range clients {
    256 			addToLane(uid)
    257 			if allLanesFilled() {
    258 				break
    259 			}
    260 		}
    261 	}
    262 }
    263 
    264 func (s *md5Server) Close() {
    265 	if s.newInput != nil {
    266 		close(s.newInput)
    267 		s.newInput = nil
    268 	}
    269 }
    270 
    271 // Invoke assembly and send results back
    272 func (s *md5Server) blocks(lanes []blockInput) {
    273 	if len(lanes) < useScalarBelow {
    274 		// Use scalar routine when below this many lanes
    275 		switch len(lanes) {
    276 		case 0:
    277 		case 1:
    278 			lane := lanes[0]
    279 			var d digest
    280 			a, ok := s.digests[lane.uid]
    281 			if ok {
    282 				d.s[0] = binary.LittleEndian.Uint32(a[0:4])
    283 				d.s[1] = binary.LittleEndian.Uint32(a[4:8])
    284 				d.s[2] = binary.LittleEndian.Uint32(a[8:12])
    285 				d.s[3] = binary.LittleEndian.Uint32(a[12:16])
    286 			} else {
    287 				d.s[0] = init0
    288 				d.s[1] = init1
    289 				d.s[2] = init2
    290 				d.s[3] = init3
    291 			}
    292 			if len(lane.msg) > 0 {
    293 				// Update...
    294 				blockScalar(&d.s, lane.msg)
    295 			}
    296 			dig := [Size]byte{}
    297 			binary.LittleEndian.PutUint32(dig[0:], d.s[0])
    298 			binary.LittleEndian.PutUint32(dig[4:], d.s[1])
    299 			binary.LittleEndian.PutUint32(dig[8:], d.s[2])
    300 			binary.LittleEndian.PutUint32(dig[12:], d.s[3])
    301 			s.digests[lane.uid] = dig
    302 
    303 			if lane.msg != nil {
    304 				s.buffers <- lane.msg
    305 			}
    306 			lanes[0] = blockInput{}
    307 
    308 		default:
    309 			s.wg.Add(len(lanes))
    310 			var results [useScalarBelow]digest
    311 			for i := range lanes {
    312 				lane := lanes[i]
    313 				go func(i int) {
    314 					var d digest
    315 					defer s.wg.Done()
    316 					a, ok := s.digests[lane.uid]
    317 					if ok {
    318 						d.s[0] = binary.LittleEndian.Uint32(a[0:4])
    319 						d.s[1] = binary.LittleEndian.Uint32(a[4:8])
    320 						d.s[2] = binary.LittleEndian.Uint32(a[8:12])
    321 						d.s[3] = binary.LittleEndian.Uint32(a[12:16])
    322 					} else {
    323 						d.s[0] = init0
    324 						d.s[1] = init1
    325 						d.s[2] = init2
    326 						d.s[3] = init3
    327 					}
    328 					if len(lane.msg) == 0 {
    329 						results[i] = d
    330 						return
    331 					}
    332 					// Update...
    333 					blockScalar(&d.s, lane.msg)
    334 					results[i] = d
    335 				}(i)
    336 			}
    337 			s.wg.Wait()
    338 			for i, lane := range lanes {
    339 				dig := [Size]byte{}
    340 				binary.LittleEndian.PutUint32(dig[0:], results[i].s[0])
    341 				binary.LittleEndian.PutUint32(dig[4:], results[i].s[1])
    342 				binary.LittleEndian.PutUint32(dig[8:], results[i].s[2])
    343 				binary.LittleEndian.PutUint32(dig[12:], results[i].s[3])
    344 				s.digests[lane.uid] = dig
    345 
    346 				if lane.msg != nil {
    347 					s.buffers <- lane.msg
    348 				}
    349 				lanes[i] = blockInput{}
    350 			}
    351 		}
    352 		return
    353 	}
    354 
    355 	inputs := [16][]byte{}
    356 	for i := range lanes {
    357 		inputs[i] = lanes[i].msg
    358 	}
    359 
    360 	// Collect active digests...
    361 	state := s.getDigests(lanes)
    362 	// Process all lanes...
    363 	s.blockMd5_x16(&state, inputs, len(lanes) <= 8)
    364 
    365 	for i, lane := range lanes {
    366 		uid := lane.uid
    367 		dig := [Size]byte{}
    368 		binary.LittleEndian.PutUint32(dig[0:], state.v0[i])
    369 		binary.LittleEndian.PutUint32(dig[4:], state.v1[i])
    370 		binary.LittleEndian.PutUint32(dig[8:], state.v2[i])
    371 		binary.LittleEndian.PutUint32(dig[12:], state.v3[i])
    372 
    373 		s.digests[uid] = dig
    374 		if lane.msg != nil {
    375 			s.buffers <- lane.msg
    376 		}
    377 		lanes[i] = blockInput{}
    378 	}
    379 }
    380 
    381 func (s *md5Server) getDigests(lanes []blockInput) (d digest16) {
    382 	for i, lane := range lanes {
    383 		a, ok := s.digests[lane.uid]
    384 		if ok {
    385 			d.v0[i] = binary.LittleEndian.Uint32(a[0:4])
    386 			d.v1[i] = binary.LittleEndian.Uint32(a[4:8])
    387 			d.v2[i] = binary.LittleEndian.Uint32(a[8:12])
    388 			d.v3[i] = binary.LittleEndian.Uint32(a[12:16])
    389 		} else {
    390 			d.v0[i] = init0
    391 			d.v1[i] = init1
    392 			d.v2[i] = init2
    393 			d.v3[i] = init3
    394 		}
    395 	}
    396 	return
    397 }