
Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs

deliveries.c (5123B)

      1 #include "mxf-send.h"
      2 /* this file is too long ---------------------------------------- DELIVERIES */
      4 struct del {
      5 	int used;
      6 	int j;
      7 	unsigned long delid;
      8 	seek_pos mpos;
      9 	stralloc recip;
     10 }
     11 ;
     13 unsigned long masterdelid = 1;
     14 unsigned int concurrency[CHANNELS] = {10, 20};
     15 unsigned int concurrencyused[CHANNELS] = {0, 0};
     16 struct del *d[CHANNELS];
     17 stralloc dline[CHANNELS];
     18 char delbuf[2048];
     20 void
     21 del_status()
     22 {
     23 	int c;
     25 	log1("status:");
     26 	for (c = 0; c < CHANNELS; ++c) {
     27 		strnum2[fmt_ulong(strnum2, (unsigned long)concurrencyused[c])] = 0;
     28 		strnum3[fmt_ulong(strnum3, (unsigned long)concurrency[c])] = 0;
     29 		qslog2(chanstatusmsg[c], strnum2);
     30 		qslog2("/", strnum3);
     31 	}
     32 	if (flagexitasap)
     33 		log1(" exitasap");
     34 	log1("\n");
     35 }
     37 void
     38 del_init()
     39 {
     40 	int c;
     41 	unsigned int i;
     42 	for (c = 0; c < CHANNELS; ++c) {
     43 		flagspawnalive[c] = 1;
     44 		while (!(d[c] = (struct del *)alloc(concurrency[c] * sizeof(struct del))))
     45 			nomem();
     46 		for (i = 0; i < concurrency[c]; ++i) {
     47 			d[c][i].used = 0;
     48 			d[c][i].recip.s = 0;
     49 		}
     50 		dline[c].s = 0;
     51 		while (!stralloc_copys(&dline[c], ""))
     52 			nomem();
     53 	}
     54 	del_status();
     55 }
     57 int
     58 del_canexit()
     59 {
     60 	int c;
     61 	for (c = 0; c < CHANNELS; ++c)
     62 		if (flagspawnalive[c])	/* if dead, nothing we can do about
     63 					 * its jobs */
     64 			if (concurrencyused[c])
     65 				return 0;
     66 	return 1;
     67 }
     69 int
     70 del_avail(int c)
     71 {
     72 	return flagspawnalive[c] && comm_canwrite(c) && (concurrencyused[c] < concurrency[c]);
     73 }
     75 void
     76 del_start(int j, seek_pos mpos, char *recip)
     77 {
     78 	unsigned int i;
     79 	int c;
     81 	c = jo[j].channel;
     82 	if (!flagspawnalive[c])
     83 		return;
     84 	if (!comm_canwrite(c))
     85 		return;
     87 	for (i = 0; i < concurrency[c]; ++i)
     88 		if (!d[c][i].used)
     89 			break;
     90 	if (i == concurrency[c])
     91 		return;
     93 	if (!stralloc_copys(&d[c][i].recip, recip)) {
     94 		nomem();
     95 		return;
     96 	}
     97 	if (!stralloc_0(&d[c][i].recip)) {
     98 		nomem();
     99 		return;
    100 	}
    101 	d[c][i].j = j;
    102 	++jo[j].refs;
    103 	d[c][i].delid = masterdelid++;
    104 	d[c][i].mpos = mpos;
    105 	d[c][i].used = 1;
    106 	++concurrencyused[c];
    108 	comm_write(c, i, jo[j].id, jo[j].sender.s, recip);
    110 	strnum2[fmt_ulong(strnum2, d[c][i].delid)] = 0;
    111 	strnum3[fmt_ulong(strnum3, jo[j].id)] = 0;
    112 	qslog2("starting delivery ", strnum2);
    113 	log3(": msg ", strnum3, tochan[c]);
    114 	logsafe(recip);
    115 	log1("\n");
    116 	del_status();
    117 }
    119 void
    120 markdone(int c, unsigned long id, seek_pos pos)
    121 {
    122 	struct stat st;
    123 	int fd;
    124 	fnmake_chanaddr(id, c, &fn);
    125 	for (;;) {
    126 		fd = open_write(fn.s);
    127 		if (fd == -1)
    128 			break;
    129 		if (fstat(fd, &st) == -1) {
    130 			close(fd);
    131 			break;
    132 		}
    133 		if (seek_set(fd, pos) == -1) {
    134 			close(fd);
    135 			break;
    136 		}
    137 		if (write(fd, "D", 1) != 1) {
    138 			close(fd);
    139 			break;
    140 		}
    141 		/*
    142 		 * further errors -> double delivery without us knowing about
    143 		 * it, oh well
    144 		 */
    145 		close(fd);
    146 		return;
    147 	}
    148 	log3("warning: trouble marking ", fn.s, "; message will be delivered twice!\n");
    149 }
    151 void
    152 del_dochan(int c)
    153 {
    154 	int r;
    155 	char ch;
    156 	int i;
    157 	int delnum;
    158 	r = read(chanfdin[c], delbuf, sizeof(delbuf));
    159 	if (r == -1)
    160 		return;
    161 	if (r == 0) {
    162 		spawndied(c);
    163 		return;
    164 	}
    165 	for (i = 0; i < r; ++i) {
    166 		ch = delbuf[i];
    167 		while (!stralloc_append(&dline[c], &ch))
    168 			nomem();
    169 		if (dline[c].len > REPORTMAX)
    170 			dline[c].len = REPORTMAX;
    171 		/*
    172 		 * qmail-lspawn and qmail-rspawn are responsible for keeping
    173 		 * it short
    174 		 */
    175 		/* but from a security point of view, we don't trust rspawn */
    176 		if (!ch && (dline[c].len > 1)) {
    177 			delnum = (unsigned int)(unsigned char)dline[c].s[0];
    178 			if ((delnum < 0) || (delnum >= concurrency[c]) || !d[c][delnum].used)
    179 				log1("warning: internal error: delivery report out of range\n");
    180 			else {
    181 				strnum3[fmt_ulong(strnum3, d[c][delnum].delid)] = 0;
    182 				if (dline[c].s[1] == 'Z')
    183 					if (jo[d[c][delnum].j].flagdying) {
    184 						dline[c].s[1] = 'D';
    185 						--dline[c].len;
    186 						while (!stralloc_cats(&dline[c], "I'm not going to try again; this message has been in the queue too long.\n"))
    187 							nomem();
    188 						while (!stralloc_0(&dline[c]))
    189 							nomem();
    190 					}
    191 				switch (dline[c].s[1]) {
    192 				case 'K':
    193 					log3("delivery ", strnum3, ": success: ");
    194 					logsafe(dline[c].s + 2);
    195 					log1("\n");
    196 					markdone(c, jo[d[c][delnum].j].id, d[c][delnum].mpos);
    197 					--jo[d[c][delnum].j].numtodo;
    198 					break;
    199 				case 'Z':
    200 					log3("delivery ", strnum3, ": deferral: ");
    201 					logsafe(dline[c].s + 2);
    202 					log1("\n");
    203 					break;
    204 				case 'D':
    205 					log3("delivery ", strnum3, ": failure: ");
    206 					logsafe(dline[c].s + 2);
    207 					log1("\n");
    208 					addbounce(jo[d[c][delnum].j].id, d[c][delnum].recip.s, dline[c].s + 2);
    209 					markdone(c, jo[d[c][delnum].j].id, d[c][delnum].mpos);
    210 					--jo[d[c][delnum].j].numtodo;
    211 					break;
    212 				default:
    213 					log3("delivery ", strnum3, ": report mangled, will defer\n");
    214 				}
    215 				job_close(d[c][delnum].j);
    216 				d[c][delnum].used = 0;
    217 				--concurrencyused[c];
    218 				del_status();
    219 			}
    220 			dline[c].len = 0;
    221 		}
    222 	}
    223 }
    225 void
    226 del_selprep(int *nfds, fd_set * rfds)
    227 {
    228 	int c;
    229 	for (c = 0; c < CHANNELS; ++c)
    230 		if (flagspawnalive[c]) {
    231 			FD_SET(chanfdin[c], rfds);
    232 			if (*nfds <= chanfdin[c])
    233 				*nfds = chanfdin[c] + 1;
    234 		}
    235 }
    237 void
    238 del_do(fd_set * rfds)
    239 {
    240 	int c;
    241 	for (c = 0; c < CHANNELS; ++c)
    242 		if (flagspawnalive[c])
    243 			if (FD_ISSET(chanfdin[c], rfds))
    244 				del_dochan(c);
    245 }