Dawn Framework 1.0
Universal data acquisition framework for embedded systems
simple.cxx
1// dawn/src/proto/ipc/simple.cxx
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5
6#include "dawn/proto/ipc/simple.hxx"
7
8#include "dawn/common/poll_loop.hxx"
9
10#include <cerrno>
11#include <cstring>
12
13#include <fcntl.h>
14#include <poll.h>
15#include <sys/stat.h>
16#include <unistd.h>
17
18using namespace dawn;
19
20int CProtoIpc::sendFrame(uint8_t cmd, const uint8_t *payload, size_t len)
21{
22 uint8_t frame[FRAME_MAX_PAYLOAD + FRAME_MIN_LEN];
23 size_t tosend;
24 size_t sent;
25 ssize_t ret;
26 uint16_t crc;
27
28 tosend = FRAME_MIN_LEN + len;
29 sent = 0;
30
31 if (len > FRAME_MAX_PAYLOAD || txfd < 0)
32 {
33 DAWNERR("IPC sendFrame invalid state cmd=0x%02x len=%zu fd=%d\n", cmd, len, txfd);
34 return -EINVAL;
35 }
36
37 frame[0] = FRAME_SYNC;
38 frame[1] = (uint8_t)(len & 0xFF);
39 frame[2] = (uint8_t)((len >> 8) & 0xFF);
40 frame[3] = cmd;
41
42 if (len > 0)
43 {
44 std::memcpy(&frame[4], payload, len);
45 }
46
47 crc = calculateCrc(&frame[3], 1 + len);
48 frame[4 + len] = (uint8_t)(crc & 0xFF);
49 frame[5 + len] = (uint8_t)((crc >> 8) & 0xFF);
50
51 while (sent < tosend)
52 {
53 ret = write(txfd, &frame[sent], tosend - sent);
54 if (ret < 0)
55 {
56 if (errno == EINTR)
57 {
58 continue;
59 }
60
61 DAWNERR("IPC write failed cmd=0x%02x ret=%d errno=%d\n", cmd, (int)ret, errno);
62 return -errno;
63 }
64
65 if (ret == 0)
66 {
67 DAWNERR("IPC write returned zero bytes\n");
68 return -EIO;
69 }
70
71 sent += (size_t)ret;
72 }
73
74 return OK;
75}
76
77void CProtoIpc::thread()
78{
79 struct pollfd fds[1];
80 SPollLoopCallbacks callbacks;
81
82 std::memset(fds, 0, sizeof(fds));
83
84 fds[0].fd = rxfd;
85 fds[0].events = POLLIN;
86 parserPos = 0;
87 parserLen = 0;
88 parserState = 0;
89
90 callbacks.beforePoll = CProtoIpc::cbPollBefore;
91 callbacks.afterPoll = CProtoIpc::cbPollAfter;
92 callbacks.onPollReady = CProtoIpc::cbPollOnReady;
93
94 CPollLoopRunner::run(workerThread(), fds, 1, DAWN_POLL_TIMEOUT_MS, callbacks, this);
95}
96
97int CProtoIpc::pollBefore(struct pollfd *pfds, nfds_t nfds)
98{
99 if (pfds == nullptr || nfds == 0)
100 {
101 return -EINVAL;
102 }
103
104 pfds[0].revents = 0;
105 return OK;
106}
107
108void CProtoIpc::pollAfter(int ret)
109{
110 if (ret < 0 && errno != EINTR)
111 {
112 DAWNERR("IPC poll failed %d\n", -errno);
113 }
114}
115
116int CProtoIpc::pollOnReady(struct pollfd *pfds, nfds_t nfds, int pollRet)
117{
118 uint8_t chunk[64];
119 ssize_t rdret;
120 size_t i;
121 uint8_t byte;
122 int ret;
123
124 if (pfds == nullptr || nfds == 0 || pollRet <= 0)
125 {
126 return -EINVAL;
127 }
128
129 if ((pfds[0].revents & (POLLIN | POLLERR | POLLHUP)) == 0)
130 {
131 return OK;
132 }
133
134 rdret = read(rxfd, chunk, sizeof(chunk));
135 if (rdret < 0)
136 {
137 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
138 {
139 return OK;
140 }
141
142 DAWNERR("IPC read failed ret=%d errno=%d\n", (int)rdret, errno);
143 usleep(10000);
144 return (int)rdret;
145 }
146
147 if (rdret == 0)
148 {
149 usleep(10000);
150 return OK;
151 }
152
153 for (i = 0; i < (size_t)rdret; i++)
154 {
155 byte = chunk[i];
156
157 switch (parserState)
158 {
159 case 0:
160 {
161 if (byte == FRAME_SYNC)
162 {
163 rxbuffer[parserPos] = byte;
164 parserPos = 1;
165 parserLen = 0;
166 parserState = 1;
167 }
168
169 break;
170 }
171
172 case 1:
173 {
174 parserLen = byte;
175 rxbuffer[parserPos] = byte;
176 parserPos = 2;
177 parserState = 2;
178 break;
179 }
180
181 case 2:
182 {
183 parserLen |= (uint16_t)(byte << 8);
184 if (parserLen > FRAME_MAX_PAYLOAD)
185 {
186 parserState = 0;
187 parserPos = 0;
188 continue;
189 }
190
191 rxbuffer[parserPos] = byte;
192 parserPos = 3;
193 parserState = 3;
194 break;
195 }
196
197 case 3:
198 {
199 rxbuffer[parserPos++] = byte;
200
201 if (parserPos >= (size_t)(FRAME_MIN_LEN + parserLen))
202 {
203 ret = handleFrame(rxbuffer, parserPos);
204 if (ret < 0)
205 {
206 DAWNERR("IPC handleFrame failed ret=%d\n", ret);
207 }
208
209 parserState = 0;
210 parserPos = 0;
211 }
212
213 break;
214 }
215 }
216 }
217
218 return OK;
219}
220
221int CProtoIpc::cbPollBefore(void *priv, struct pollfd *pfds, nfds_t nfds)
222{
223 CProtoIpc *self;
224
225 self = static_cast<CProtoIpc *>(priv);
226 if (self == nullptr)
227 {
228 return -EINVAL;
229 }
230
231 return self->pollBefore(pfds, nfds);
232}
233
234void CProtoIpc::cbPollAfter(void *priv, struct pollfd *pfds, nfds_t nfds, int ret)
235{
236 CProtoIpc *self;
237
238 (void)pfds;
239 (void)nfds;
240 self = static_cast<CProtoIpc *>(priv);
241 if (self == nullptr)
242 {
243 return;
244 }
245
246 self->pollAfter(ret);
247}
248
249int CProtoIpc::cbPollOnReady(void *priv, struct pollfd *pfds, nfds_t nfds, int pollRet)
250{
251 CProtoIpc *self;
252
253 self = static_cast<CProtoIpc *>(priv);
254 if (self == nullptr)
255 {
256 return -EINVAL;
257 }
258
259 return self->pollOnReady(pfds, nfds, pollRet);
260}
261
262int CProtoIpc::configureDesc(const CDescObject &desc)
263{
264 SObjectCfg::SObjectCfgItem *item = nullptr;
265 size_t offset;
266
267 offset = 0;
268
269 for (size_t i = 0; i < desc.getSize(); i++)
270 {
271 item = desc.objectCfgItemNext(offset);
272
274 {
275 DAWNERR("Unsupported IPC config 0x%08" PRIx32 "\n", item->cfgid.v);
276 return -EINVAL;
277 }
278
279 switch (item->cfgid.s.id)
280 {
281 case PROTO_IPC_CFG_IOBIND:
282 {
283 for (size_t j = 0; j < item->cfgid.s.size;)
284 {
285 SProtoIpcIOBind *tmp;
286
287 tmp = reinterpret_cast<SProtoIpcIOBind *>(item->data + j);
288
289 allocObject(tmp);
290 j += sizeof(SProtoIpcIOBind) / 4;
291 }
292
293 break;
294 }
295
296 case PROTO_IPC_CFG_RX_PATH:
297 {
298 rxPath = reinterpret_cast<const char *>(&item->data);
299 break;
300 }
301
302 case PROTO_IPC_CFG_TX_PATH:
303 {
304 txPath = reinterpret_cast<const char *>(&item->data);
305 break;
306 }
307
308 default:
309 {
310 DAWNERR("Unsupported IPC config 0x%08" PRIx32 "\n", item->cfgid.v);
311 return -EINVAL;
312 }
313 }
314 }
315
316 return OK;
317}
318
319int CProtoIpc::ensureFifo(const char *path)
320{
321 struct stat st;
322 int err;
323 int ret;
324
325 if (!path)
326 {
327 DAWNERR("IPC FIFO path not configured\n");
328 return -EINVAL;
329 }
330
331 ret = mkfifo(path, 0666);
332 if (ret == 0)
333 {
334 return OK;
335 }
336
337 err = errno;
338
339 if (err == EEXIST)
340 {
341 ret = stat(path, &st);
342 if (ret == 0 && S_ISFIFO(st.st_mode))
343 {
344 return OK;
345 }
346
347 if (ret < 0)
348 {
349 err = errno;
350 }
351 else
352 {
353 err = EINVAL;
354 }
355 }
356
357 if (err <= 0)
358 {
359 err = EIO;
360 }
361
362 DAWNERR("Failed to create FIFO %s errno=%d\n", path, err);
363
364 return -err;
365}
366
367int CProtoIpc::fifoInit()
368{
369 int ret;
370
371 ret = ensureFifo(rxPath);
372 if (ret < 0)
373 {
374 return ret;
375 }
376
377 ret = ensureFifo(txPath);
378 if (ret < 0)
379 {
380 return ret;
381 }
382
383 rxfd = open(rxPath, O_RDWR | O_NONBLOCK);
384 if (rxfd < 0)
385 {
386 DAWNERR("Failed to open IPC RX FIFO %s errno=%d\n", rxPath, errno);
387 return -errno;
388 }
389
390 txfd = open(txPath, O_RDWR | O_NONBLOCK);
391 if (txfd < 0)
392 {
393 DAWNERR("Failed to open IPC TX FIFO %s errno=%d\n", txPath, errno);
394 close(rxfd);
395 rxfd = -1;
396 return -errno;
397 }
398
399 DAWNINFO("IPC protocol initialized rx=%s tx=%s\n", rxPath, txPath);
400
401 return OK;
402}
403
404CProtoIpc::~CProtoIpc()
405{
406 deinit();
407}
408
410{
411 int ret;
412
413 ret = configureDesc(getDesc());
414 if (ret != OK)
415 {
416 DAWNERR("IPC configure failed (error %d)\n", ret);
417 return ret;
418 }
419
420 return OK;
421}
422
424{
425 int ret;
426
427 ret = fifoInit();
428 if (ret < 0)
429 {
430 return ret;
431 }
432
433 ret = createBuffers();
434 if (ret < 0)
435 {
436 DAWNERR("failed to create data %d\n", ret);
437 return ret;
438 }
439
440#ifdef CONFIG_DAWN_IO_NOTIFY
441 ret = setupNotifications();
442 if (ret < 0)
443 {
444 DAWNERR("failed to setup notifications %d\n", ret);
445 }
446#endif
447
448 return OK;
449}
450
452{
453#ifdef CONFIG_DAWN_IO_NOTIFY
454 destroyNotifications();
455#endif
456
458
459 if (rxfd >= 0)
460 {
461 close(rxfd);
462 rxfd = -1;
463 }
464
465 if (txfd >= 0)
466 {
467 close(txfd);
468 txfd = -1;
469 }
470
471 if (rxPath)
472 {
473 unlink(rxPath);
474 }
475
476 if (txPath)
477 {
478 unlink(txPath);
479 }
480
481 return OK;
482}
483
485{
486 int ret;
487
488 ret = startWorkerThread([this]() { thread(); });
489 if (ret < 0)
490 {
491 DAWNERR("failed to start thread %d\n", ret);
492 return ret;
493 }
494
495 DAWNINFO("IPC protocol started\n");
496
497 return OK;
498}
499
501{
502#ifdef CONFIG_DAWN_IO_NOTIFY
503 cleanupNotifications();
504#endif
505
507
508 DAWNINFO("IPC protocol stopped\n");
509
510 return OK;
511}
512
514{
515 return workerThreadRunning();
516}
Descriptor wrapper for individual object configuration.
size_t getSize() const
Get number of configuration items for this object.
SObjectCfg::SObjectCfgItem * objectCfgItemNext(size_t &offset) const
Get config item at current offset and advance past it.
CDescObject & getDesc()
Get descriptor object for this object.
Definition object.cxx:190
static int run(CThreadedObject &threadCtl, struct pollfd *pfds, nfds_t nfds, int timeoutMs, const SPollLoopCallbacks &callbacks, void *priv)
Run poll loop until quit is requested.
Definition poll_loop.hxx:61
@ PROTO_CLASS_IPC
FIFO-based local IPC protocol.
Definition common.hxx:83
Simple FIFO-based IPC transport for local Dawn communication.
Definition simple.hxx:30
int doStart()
Start implementation hook.
Definition simple.cxx:484
int deinit()
De-initialize object.
Definition simple.cxx:451
int configure()
Configure object from descriptor data.
Definition simple.cxx:409
int doStop()
Stop implementation hook.
Definition simple.cxx:500
int init()
One-time initialize object after bindings are resolved.
Definition simple.cxx:423
bool hasThread() const
Check if a background thread is active.
Definition simple.cxx:513
static uint8_t FRAME_SYNC
Frame structure constants.
void allocObject(SProtoSimpleIOBind *cfg)
Store an allocated IO binding.
uint16_t calculateCrc(const uint8_t *data, size_t len)
Calculate 16-bit CRC checksum.
int handleFrame(const uint8_t *frame, size_t len)
Process a received frame.
int createBuffers()
Allocate shared per-IO data buffers.
int destroyBuffers()
Destroy shared per-IO data buffers.
bool workerThreadRunning() const
Check if the worker thread is running.
Definition thread.hxx:269
int stopWorkerThread()
Stop the worker thread.
Definition thread.hxx:258
int startWorkerThread(Func &&func)
Start the worker thread with a given function.
Definition thread.hxx:246
CThreadedObject & workerThread()
Get a reference to this thread controller.
Definition thread.hxx:280
Out-of-tree user-extension hooks for Dawn.
Definition bindable.hxx:13
Single configuration item within object.
ObjectCfgData_t data[]
Configuration data array (flexible, size from cfgid.s.size).
UObjectCfgId cfgid
Configuration ID header (type, class, id, size, rw, dtype).
Callback set for poll-based worker loops.
Definition poll_loop.hxx:28
ObjectCfgId v
Raw 32-bit ConfigID value (for storage, comparison).
Definition objectcfg.hxx:82
uint32_t cls
Object class (bits 21-29, max 511).
uint32_t id
Configuration identifier (bits 0-4, max 31).
Definition objectcfg.hxx:94
uint32_t size
Configuration data size in 32-bit words (bits 5-14, max 1023).
struct dawn::SObjectCfg::UObjectCfgId::@10 s
Bit-field structure for named member access.