nightmaremail

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs

pass.c (5235B)


      1 #include "mxf-send.h"
      2 
      3 /* this file is too long -------------------------------------------- PASSES */
      4 
      5 struct {
      6 	unsigned long id;	/* if 0, need a new pass */
      7 	int j;			/* defined if id; job number */
      8 	int fd;			/* defined if id; reading from {local,remote} */
      9 	seek_pos mpos;		/* defined if id; mark position */
     10 	substdio ss;
     11 	char buf[128];
     12 }
     13  pass[CHANNELS];
     14 
     15 void
     16 pass_init()
     17 {
     18 	int c;
     19 	for (c = 0; c < CHANNELS; ++c)
     20 		pass[c].id = 0;
     21 }
     22 
     23 void
     24 pass_selprep(datetime_sec * wakeup)
     25 {
     26 	/* -Lightning looks confused- Huh? Frick, I need to write a dictionary. */
     27 	int c;
     28 	struct prioq_elt pe;
     29 	if (flagexitasap)
     30 		return;
     31 	for (c = 0; c < CHANNELS; ++c)
     32 		if (pass[c].id)
     33 			if (del_avail(c)) {
     34 				*wakeup = 0;
     35 				return;
     36 			}
     37 	if (job_avail())
     38 		for (c = 0; c < CHANNELS; ++c)
     39 			if (!pass[c].id)
     40 				if (prioq_min(&pqchan[c], &pe))
     41 					if (*wakeup > pe.dt)
     42 						*wakeup = pe.dt;
     43 	if (prioq_min(&pqfail, &pe))
     44 		if (*wakeup > pe.dt)
     45 			*wakeup = pe.dt;
     46 	if (prioq_min(&pqdone, &pe))
     47 		if (*wakeup > pe.dt)
     48 			*wakeup = pe.dt;
     49 }
     50 
     51 static datetime_sec squareroot(datetime_sec x) {
     52 	/* result^2 <= x < (result + 1)^2 */
     53 	/* assuming: >= 0 */
     54 	datetime_sec y;
     55 	datetime_sec yy;
     56 	datetime_sec y21;
     57 	int j;
     58 
     59 	y = 0;
     60 	yy = 0;
     61 	for (j = 15; j >= 0; --j) {
     62 		y21 = (y << (j + 1)) + (1 << (j + j));
     63 		if (y21 <= x - yy) {
     64 			y += (1 << j);
     65 			yy += y21;
     66 		}
     67 	}
     68 	return y;
     69 }
     70 
     71 datetime_sec
     72 nextretry(datetime_sec birth, int c)
     73 {
     74 	datetime_sec n;
     75 
     76 	if (birth > recent)
     77 		n = 0;
     78 	else
     79 		n = squareroot(recent - birth);	/* no need to add fuzz to
     80 						 * recent */
     81 	n += chanskip[c];
     82 	return birth + n * n;
     83 }
     84 
     85 void
     86 pass_dochan(int c)
     87 {
     88 	datetime_sec birth;
     89 	struct prioq_elt pe;
     90 	static stralloc line = {0}; /* TA_ZERO */
     91 	int match;
     92 
     93 	if (flagexitasap) /* reporting to demob soon, no need to do this */
     94 		return;
     95 
     96 	if (!pass[c].id) {
     97 		if (!job_avail())
     98 			return;
     99 		if (!prioq_min(&pqchan[c], &pe))
    100 			return;
    101 		if (pe.dt > recent)
    102 			return;
    103 		fnmake_chanaddr(pe.id, c, &fn);
    104 
    105 		prioq_delmin(&pqchan[c]);
    106 		pass[c].mpos = 0;
    107 		pass[c].fd = open_read(fn.s);
    108 		if (pass[c].fd == -1)
    109 			goto trouble;
    110 		if (!getinfo(&line, &birth, pe.id)) {
    111 			close(pass[c].fd);
    112 			goto trouble;
    113 		}
    114 		pass[c].id = pe.id;
    115 		substdio_fdbuf(&pass[c].ss, read, pass[c].fd, pass[c].buf, sizeof(pass[c].buf));
    116 		pass[c].j = job_open(pe.id, c);
    117 		jo[pass[c].j].retry = nextretry(birth, c);
    118 		jo[pass[c].j].flagdying = (recent > birth + lifetime);
    119 		while (!stralloc_copy(&jo[pass[c].j].sender, &line))
    120 			nomem();
    121 	}
    122 
    123 	if (!del_avail(c))
    124 		return;
    125 
    126 	if (getln(&pass[c].ss, &line, &match, '\0') == -1) {
    127 		fnmake_chanaddr(pass[c].id, c, &fn);
    128 		log3("warning: trouble reading ", fn.s, "; will try again later\n");
    129 		close(pass[c].fd);
    130 		job_close(pass[c].j);
    131 		pass[c].id = 0;
    132 		return;
    133 	}
    134 	if (!match) {
    135 		close(pass[c].fd);
    136 		jo[pass[c].j].flaghiteof = 1;
    137 		job_close(pass[c].j);
    138 		pass[c].id = 0;
    139 		return;
    140 	}
    141 	switch (line.s[0]) {
    142 	case 'T':
    143 		++jo[pass[c].j].numtodo;
    144 		del_start(pass[c].j, pass[c].mpos, line.s + 1);
    145 		break;
    146 	case 'D':
    147 		break;
    148 	default:
    149 		fnmake_chanaddr(pass[c].id, c, &fn);
    150 		log3("warning: unknown record type in ", fn.s, "!\n");
    151 		close(pass[c].fd);
    152 		job_close(pass[c].j);
    153 		pass[c].id = 0;
    154 		return;
    155 	}
    156 
    157 	pass[c].mpos += line.len;
    158 	return;
    159 
    160 trouble:
    161 	log3("warning: trouble opening ", fn.s, "; will try again later\n");
    162 	pe.dt = recent + SLEEP_SYSFAIL;
    163 	while (!prioq_insert(&pqchan[c], &pe))
    164 		nomem();
    165 }
    166 
    167 void
    168 messdone(unsigned long id)
    169 {
    170 	char ch;
    171 	int c;
    172 	struct prioq_elt pe;
    173 	struct stat st;
    174 
    175 	for (c = 0; c < CHANNELS; ++c) {
    176 		fnmake_chanaddr(id, c, &fn);
    177 		if (stat(fn.s, &st) == 0)
    178 			return;	/* false alarm; consequence of HOPEFULLY */
    179 		if (errno != error_noent) {
    180 			log3("warning: unable to stat ", fn.s, "; will try again later\n");
    181 			goto fail;
    182 		}
    183 	}
    184 
    185 	fnmake_todo(id, &fn);
    186 	if (stat(fn.s, &st) == 0)
    187 		return;
    188 	if (errno != error_noent) {
    189 		log3("warning: unable to stat ", fn.s, "; will try again later\n");
    190 		goto fail;
    191 	}
    192 
    193 	fnmake_info(id, &fn);
    194 	if (stat(fn.s, &st) == -1) {
    195 		if (errno == error_noent)
    196 			return;
    197 		log3("warning: unable to stat ", fn.s, "; will try again later\n");
    198 		goto fail;
    199 	}
    200 
    201 	/* -todo +info -local -remote ?bounce */
    202 	if (!injectbounce(id))
    203 		goto fail;	/* injectbounce() produced error message */
    204 
    205 	strnum3[fmt_ulong(strnum3, id)] = 0;
    206 	log3("end msg ", strnum3, "\n");
    207 
    208 	/* -todo +info -local -remote -bounce */
    209 	fnmake_info(id, &fn);
    210 	if (unlink(fn.s) == -1) {
    211 		log3("warning: unable to unlink ", fn.s, "; will try again later\n");
    212 		goto fail;
    213 	}
    214 
    215 	/* -todo -info -local -remote -bounce; we can relax */
    216 	fnmake_foop(id, &fn);
    217 	if (substdio_putflush(&sstoqc, fn.s, fn.len) == -1) {
    218 		cleandied();
    219 		return;
    220 	}
    221 	if (substdio_get(&ssfromqc, &ch, 1) != 1) {
    222 		cleandied();
    223 		return;
    224 	}
    225 	if (ch != '+')
    226 		log3("warning: qmail-clean unable to clean up ", fn.s, "\n");
    227 
    228 	return;
    229 
    230 fail:
    231 	pe.id = id;
    232 	pe.dt = now() + SLEEP_SYSFAIL;
    233 	while (!prioq_insert(&pqdone, &pe))
    234 		nomem();
    235 }
    236 
    237 void
    238 pass_do()
    239 {
    240 	int c;
    241 	struct prioq_elt pe;
    242 
    243 	for (c = 0; c < CHANNELS; ++c)
    244 		pass_dochan(c);
    245 	if (prioq_min(&pqfail, &pe))
    246 		if (pe.dt <= recent) {
    247 			prioq_delmin(&pqfail);
    248 			pqadd(pe.id);
    249 		}
    250 	if (prioq_min(&pqdone, &pe))
    251 		if (pe.dt <= recent) {
    252 			prioq_delmin(&pqdone);
    253 			messdone(pe.id);
    254 		}
    255 }