deliveries.c (5123B)
1 #include "mxf-send.h" 2 /* this file is too long ---------------------------------------- DELIVERIES */ 3 4 struct del { 5 int used; 6 int j; 7 unsigned long delid; 8 seek_pos mpos; 9 stralloc recip; 10 } 11 ; 12 13 unsigned long masterdelid = 1; 14 unsigned int concurrency[CHANNELS] = {10, 20}; 15 unsigned int concurrencyused[CHANNELS] = {0, 0}; 16 struct del *d[CHANNELS]; 17 stralloc dline[CHANNELS]; 18 char delbuf[2048]; 19 20 void 21 del_status() 22 { 23 int c; 24 25 log1("status:"); 26 for (c = 0; c < CHANNELS; ++c) { 27 strnum2[fmt_ulong(strnum2, (unsigned long)concurrencyused[c])] = 0; 28 strnum3[fmt_ulong(strnum3, (unsigned long)concurrency[c])] = 0; 29 qslog2(chanstatusmsg[c], strnum2); 30 qslog2("/", strnum3); 31 } 32 if (flagexitasap) 33 log1(" exitasap"); 34 log1("\n"); 35 } 36 37 void 38 del_init() 39 { 40 int c; 41 unsigned int i; 42 for (c = 0; c < CHANNELS; ++c) { 43 flagspawnalive[c] = 1; 44 while (!(d[c] = (struct del *)alloc(concurrency[c] * sizeof(struct del)))) 45 nomem(); 46 for (i = 0; i < concurrency[c]; ++i) { 47 d[c][i].used = 0; 48 d[c][i].recip.s = 0; 49 } 50 dline[c].s = 0; 51 while (!stralloc_copys(&dline[c], "")) 52 nomem(); 53 } 54 del_status(); 55 } 56 57 int 58 del_canexit() 59 { 60 int c; 61 for (c = 0; c < CHANNELS; ++c) 62 if (flagspawnalive[c]) /* if dead, nothing we can do about 63 * its jobs */ 64 if (concurrencyused[c]) 65 return 0; 66 return 1; 67 } 68 69 int 70 del_avail(int c) 71 { 72 return flagspawnalive[c] && comm_canwrite(c) && (concurrencyused[c] < concurrency[c]); 73 } 74 75 void 76 del_start(int j, seek_pos mpos, char *recip) 77 { 78 unsigned int i; 79 int c; 80 81 c = jo[j].channel; 82 if (!flagspawnalive[c]) 83 return; 84 if (!comm_canwrite(c)) 85 return; 86 87 for (i = 0; i < concurrency[c]; ++i) 88 if (!d[c][i].used) 89 break; 90 if (i == concurrency[c]) 91 return; 92 93 if (!stralloc_copys(&d[c][i].recip, recip)) { 94 nomem(); 95 return; 96 } 97 if (!stralloc_0(&d[c][i].recip)) { 98 nomem(); 99 return; 100 } 101 d[c][i].j = j; 102 ++jo[j].refs; 103 d[c][i].delid = masterdelid++; 104 d[c][i].mpos = mpos; 105 d[c][i].used = 1; 106 ++concurrencyused[c]; 107 108 comm_write(c, i, jo[j].id, jo[j].sender.s, recip); 109 110 strnum2[fmt_ulong(strnum2, d[c][i].delid)] = 0; 111 strnum3[fmt_ulong(strnum3, jo[j].id)] = 0; 112 qslog2("starting delivery ", strnum2); 113 log3(": msg ", strnum3, tochan[c]); 114 logsafe(recip); 115 log1("\n"); 116 del_status(); 117 } 118 119 void 120 markdone(int c, unsigned long id, seek_pos pos) 121 { 122 struct stat st; 123 int fd; 124 fnmake_chanaddr(id, c, &fn); 125 for (;;) { 126 fd = open_write(fn.s); 127 if (fd == -1) 128 break; 129 if (fstat(fd, &st) == -1) { 130 close(fd); 131 break; 132 } 133 if (seek_set(fd, pos) == -1) { 134 close(fd); 135 break; 136 } 137 if (write(fd, "D", 1) != 1) { 138 close(fd); 139 break; 140 } 141 /* 142 * further errors -> double delivery without us knowing about 143 * it, oh well 144 */ 145 close(fd); 146 return; 147 } 148 log3("warning: trouble marking ", fn.s, "; message will be delivered twice!\n"); 149 } 150 151 void 152 del_dochan(int c) 153 { 154 int r; 155 char ch; 156 int i; 157 int delnum; 158 r = read(chanfdin[c], delbuf, sizeof(delbuf)); 159 if (r == -1) 160 return; 161 if (r == 0) { 162 spawndied(c); 163 return; 164 } 165 for (i = 0; i < r; ++i) { 166 ch = delbuf[i]; 167 while (!stralloc_append(&dline[c], &ch)) 168 nomem(); 169 if (dline[c].len > REPORTMAX) 170 dline[c].len = REPORTMAX; 171 /* 172 * qmail-lspawn and qmail-rspawn are responsible for keeping 173 * it short 174 */ 175 /* but from a security point of view, we don't trust rspawn */ 176 if (!ch && (dline[c].len > 1)) { 177 delnum = (unsigned int)(unsigned char)dline[c].s[0]; 178 if ((delnum < 0) || (delnum >= concurrency[c]) || !d[c][delnum].used) 179 log1("warning: internal error: delivery report out of range\n"); 180 else { 181 strnum3[fmt_ulong(strnum3, d[c][delnum].delid)] = 0; 182 if (dline[c].s[1] == 'Z') 183 if (jo[d[c][delnum].j].flagdying) { 184 dline[c].s[1] = 'D'; 185 --dline[c].len; 186 while (!stralloc_cats(&dline[c], "I'm not going to try again; this message has been in the queue too long.\n")) 187 nomem(); 188 while (!stralloc_0(&dline[c])) 189 nomem(); 190 } 191 switch (dline[c].s[1]) { 192 case 'K': 193 log3("delivery ", strnum3, ": success: "); 194 logsafe(dline[c].s + 2); 195 log1("\n"); 196 markdone(c, jo[d[c][delnum].j].id, d[c][delnum].mpos); 197 --jo[d[c][delnum].j].numtodo; 198 break; 199 case 'Z': 200 log3("delivery ", strnum3, ": deferral: "); 201 logsafe(dline[c].s + 2); 202 log1("\n"); 203 break; 204 case 'D': 205 log3("delivery ", strnum3, ": failure: "); 206 logsafe(dline[c].s + 2); 207 log1("\n"); 208 addbounce(jo[d[c][delnum].j].id, d[c][delnum].recip.s, dline[c].s + 2); 209 markdone(c, jo[d[c][delnum].j].id, d[c][delnum].mpos); 210 --jo[d[c][delnum].j].numtodo; 211 break; 212 default: 213 log3("delivery ", strnum3, ": report mangled, will defer\n"); 214 } 215 job_close(d[c][delnum].j); 216 d[c][delnum].used = 0; 217 --concurrencyused[c]; 218 del_status(); 219 } 220 dline[c].len = 0; 221 } 222 } 223 } 224 225 void 226 del_selprep(int *nfds, fd_set * rfds) 227 { 228 int c; 229 for (c = 0; c < CHANNELS; ++c) 230 if (flagspawnalive[c]) { 231 FD_SET(chanfdin[c], rfds); 232 if (*nfds <= chanfdin[c]) 233 *nfds = chanfdin[c] + 1; 234 } 235 } 236 237 void 238 del_do(fd_set * rfds) 239 { 240 int c; 241 for (c = 0; c < CHANNELS; ++c) 242 if (flagspawnalive[c]) 243 if (FD_ISSET(chanfdin[c], rfds)) 244 del_dochan(c); 245 }