Dawn Framework 1.0
Universal data acquisition framework for embedded systems
simple.cxx
1// dawn/src/proto/udp/simple.cxx
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5
6#include "dawn/proto/udp/simple.hxx"
7
8#include "dawn/common/poll_loop.hxx"
9
10#include <arpa/inet.h>
11#include <netinet/in.h>
12#include <poll.h>
13#include <sys/socket.h>
14#include <unistd.h>
15
16#include <cstring>
17
18using namespace dawn;
19
20int CProtoUdp::sendFrame(uint8_t cmd, const uint8_t *payload, size_t len)
21{
22 uint8_t frame[FRAME_MAX_PAYLOAD + FRAME_MIN_LEN];
23 size_t tosend;
24 ssize_t ret;
25 uint16_t crc;
26
27 if (len > FRAME_MAX_PAYLOAD || fd < 0)
28 {
29 DAWNERR("UDP sendFrame invalid state cmd=0x%02x len=%zu fd=%d\n", cmd, len, fd);
30 return -1;
31 }
32
33 tosend = FRAME_MIN_LEN + len;
34
35 frame[0] = FRAME_SYNC;
36 frame[1] = (uint8_t)(len & 0xFF);
37 frame[2] = (uint8_t)((len >> 8) & 0xFF);
38 frame[3] = cmd;
39
40 if (len > 0)
41 {
42 std::memcpy(&frame[4], payload, len);
43 }
44
45 crc = calculateCrc(&frame[3], 1 + len);
46 frame[4 + len] = (uint8_t)(crc & 0xFF);
47 frame[5 + len] = (uint8_t)((crc >> 8) & 0xFF);
48
49 {
50 std::lock_guard<std::mutex> lock(senderMutex);
51 if (sender.sin_port == 0)
52 {
53 DAWNERR("UDP sendFrame no sender cmd=0x%02x len=%zu\n", cmd, len);
54 return -1;
55 }
56
57 DAWNINFO("UDP sendFrame cmd=0x%02x len=%zu to port=%u ip=0x%08" PRIx32 "\n",
58 cmd,
59 len,
60 ntohs(sender.sin_port),
61 ntohl(sender.sin_addr.s_addr));
62
63 ret =
64 sendto(fd, frame, tosend, 0, reinterpret_cast<struct sockaddr *>(&sender), sizeof(sender));
65 }
66
67 if (ret < 0 || static_cast<size_t>(ret) != tosend)
68 {
69 DAWNERR("UDP sendto failed cmd=0x%02x ret=%zd exp=%zu errno=%d\n", cmd, ret, tosend, errno);
70 return ret < 0 ? static_cast<int>(ret) : -EIO;
71 }
72
73 return OK;
74}
75
76void CProtoUdp::thread()
77{
78 SPollLoopCallbacks callbacks;
79 struct pollfd fds[1];
80
81 std::memset(fds, 0, sizeof(fds));
82
83 fds[0].fd = fd;
84 fds[0].events = POLLIN;
85
86 callbacks.beforePoll = CProtoUdp::cbPollBefore;
87 callbacks.afterPoll = CProtoUdp::cbPollAfter;
88 callbacks.onPollReady = CProtoUdp::cbPollOnReady;
89
90 CPollLoopRunner::run(workerThread(), fds, 1, DAWN_POLL_TIMEOUT_MS, callbacks, this);
91}
92
93int CProtoUdp::pollBefore(struct pollfd *pfds, nfds_t nfds)
94{
95 if (pfds == nullptr || nfds == 0)
96 {
97 return -EINVAL;
98 }
99
100 pfds[0].revents = 0;
101 return OK;
102}
103
104void CProtoUdp::pollAfter(int ret)
105{
106 if (ret < 0)
107 {
108 DAWNERR("UDP poll failed %d\n", -errno);
109 }
110}
111
112int CProtoUdp::pollOnReady(struct pollfd *pfds, nfds_t nfds, int pollRet)
113{
114 socklen_t fromlen;
115 ssize_t ret;
116
117 if (pfds == nullptr || nfds == 0 || pollRet <= 0)
118 {
119 return -EINVAL;
120 }
121
122 if ((pfds[0].revents & POLLIN) == 0)
123 {
124 return OK;
125 }
126
127 fromlen = sizeof(sender);
128 {
129 std::lock_guard<std::mutex> lock(senderMutex);
130 ret = recvfrom(
131 fd, rxbuffer, sizeof(rxbuffer), 0, reinterpret_cast<struct sockaddr *>(&sender), &fromlen);
132 }
133
134 if (ret <= 0)
135 {
136 DAWNERR("UDP recvfrom ret=%zd errno=%d\n", ret, errno);
137 usleep(10000);
138 return static_cast<int>(ret);
139 }
140
141 int frameRet = handleFrame(rxbuffer, static_cast<size_t>(ret));
142 if (frameRet < 0)
143 {
144 DAWNERR("UDP handleFrame failed ret=%d\n", frameRet);
145 }
146
147 pfds[0].revents = 0;
148 return OK;
149}
150
151int CProtoUdp::cbPollBefore(void *priv, struct pollfd *pfds, nfds_t nfds)
152{
153 CProtoUdp *self;
154
155 self = static_cast<CProtoUdp *>(priv);
156 if (self == nullptr)
157 {
158 return -EINVAL;
159 }
160
161 return self->pollBefore(pfds, nfds);
162}
163
164void CProtoUdp::cbPollAfter(void *priv, struct pollfd *pfds, nfds_t nfds, int ret)
165{
166 CProtoUdp *self;
167
168 (void)pfds;
169 (void)nfds;
170
171 self = static_cast<CProtoUdp *>(priv);
172 if (self == nullptr)
173 {
174 return;
175 }
176
177 self->pollAfter(ret);
178}
179
180int CProtoUdp::cbPollOnReady(void *priv, struct pollfd *pfds, nfds_t nfds, int pollRet)
181{
182 CProtoUdp *self;
183
184 self = static_cast<CProtoUdp *>(priv);
185 if (self == nullptr)
186 {
187 return -EINVAL;
188 }
189
190 return self->pollOnReady(pfds, nfds, pollRet);
191}
192
193int CProtoUdp::configureDesc(const CDescObject &desc)
194{
195 SObjectCfg::SObjectCfgItem *item = nullptr;
196 size_t offset;
197
198 offset = 0;
199
200 for (size_t i = 0; i < desc.getSize(); i++)
201 {
202 item = desc.objectCfgItemNext(offset);
203
205 {
206 DAWNERR("Unsupported UDP config 0x%08" PRIx32 "\n", item->cfgid.v);
207 return -EINVAL;
208 }
209
210 switch (item->cfgid.s.id)
211 {
212 case PROTO_UDP_CFG_IOBIND:
213 {
214 for (size_t j = 0; j < item->cfgid.s.size;)
215 {
216 SProtoUdpIOBind *tmp;
217
218 tmp = reinterpret_cast<SProtoUdpIOBind *>(item->data + j);
219
220 allocObject(tmp);
221 j += sizeof(SProtoUdpIOBind) / 4;
222 }
223
224 break;
225 }
226
227 case PROTO_UDP_CFG_PORT:
228 {
229 port = static_cast<uint16_t>(item->data[0]);
230 break;
231 }
232
233 default:
234 {
235 DAWNERR("Unsupported UDP config 0x%08" PRIx32 "\n", item->cfgid.v);
236 return -EINVAL;
237 }
238 }
239 }
240
241 return OK;
242}
243
244int CProtoUdp::udpInit()
245{
246 struct sockaddr_in addr;
247 int ret;
248
249 fd = socket(AF_INET, SOCK_DGRAM, 0);
250 if (fd < 0)
251 {
252 DAWNERR("Failed to create UDP socket: %d\n", -errno);
253 return -1;
254 }
255
256 std::memset(&addr, 0, sizeof(addr));
257 addr.sin_family = AF_INET;
258 addr.sin_addr.s_addr = INADDR_ANY;
259 addr.sin_port = htons(port);
260
261 ret = bind(fd, reinterpret_cast<struct sockaddr *>(&addr), sizeof(addr));
262 if (ret < 0)
263 {
264 DAWNERR("Failed to bind UDP socket to port %u: %d\n", port, -errno);
265 close(fd);
266 fd = -1;
267 return -1;
268 }
269
270 DAWNINFO("UDP protocol initialized on port %u\n", port);
271
272 return 0;
273}
274
275CProtoUdp::~CProtoUdp()
276{
277 deinit();
278}
279
281{
282 int ret;
283
284 ret = configureDesc(getDesc());
285 if (ret != OK)
286 {
287 DAWNERR("UDP configure failed (error %d)\n", ret);
288 return ret;
289 }
290
291 return OK;
292}
293
295{
296 int ret;
297
298 ret = udpInit();
299 if (ret < 0)
300 {
301 return ret;
302 }
303
304 ret = createBuffers();
305 if (ret < 0)
306 {
307 DAWNERR("failed to create data %d\n", ret);
308 return ret;
309 }
310
311#ifdef CONFIG_DAWN_IO_NOTIFY
312 ret = setupNotifications();
313 if (ret < 0)
314 {
315 DAWNERR("failed to setup notifications %d\n", ret);
316 }
317#endif
318
319 return OK;
320}
321
323{
324#ifdef CONFIG_DAWN_IO_NOTIFY
325 destroyNotifications();
326#endif
327
329
330 if (fd >= 0)
331 {
332 close(fd);
333 fd = -1;
334 }
335
336 return OK;
337}
338
340{
341 int ret;
342
343 ret = startWorkerThread([this]() { thread(); });
344 if (ret < 0)
345 {
346 DAWNERR("failed to start thread %d\n", ret);
347 return ret;
348 }
349
350 DAWNINFO("UDP protocol started\n");
351
352 return 0;
353}
354
356{
357#ifdef CONFIG_DAWN_IO_NOTIFY
358 cleanupNotifications();
359#endif
360
362
363 DAWNINFO("UDP protocol stopped\n");
364
365 return 0;
366}
367
369{
370 return workerThreadRunning();
371}
Descriptor wrapper for individual object configuration.
size_t getSize() const
Get number of configuration items for this object.
SObjectCfg::SObjectCfgItem * objectCfgItemNext(size_t &offset) const
Get config item at current offset and advance past it.
CDescObject & getDesc()
Get descriptor object for this object.
Definition object.cxx:190
static int run(CThreadedObject &threadCtl, struct pollfd *pfds, nfds_t nfds, int timeoutMs, const SPollLoopCallbacks &callbacks, void *priv)
Run poll loop until quit is requested.
Definition poll_loop.hxx:61
@ PROTO_CLASS_UDP
UDP-based protocol.
Definition common.hxx:79
static uint8_t FRAME_SYNC
Frame structure constants.
void allocObject(SProtoSimpleIOBind *cfg)
Store an allocated IO binding.
uint16_t calculateCrc(const uint8_t *data, size_t len)
Calculate 16-bit CRC checksum.
int handleFrame(const uint8_t *frame, size_t len)
Process a received frame.
int createBuffers()
Allocate shared per-IO data buffers.
int destroyBuffers()
Destroy shared per-IO data buffers.
Simple binary UDP protocol for device communication.
Definition simple.hxx:30
bool hasThread() const
Check if a background thread is active.
Definition simple.cxx:368
int deinit()
De-initialize object.
Definition simple.cxx:322
int init()
One-time initialize object after bindings are resolved.
Definition simple.cxx:294
int configure()
Configure object from descriptor data.
Definition simple.cxx:280
int doStart()
Start implementation hook.
Definition simple.cxx:339
int doStop()
Stop implementation hook.
Definition simple.cxx:355
bool workerThreadRunning() const
Check if the worker thread is running.
Definition thread.hxx:269
int stopWorkerThread()
Stop the worker thread.
Definition thread.hxx:258
int startWorkerThread(Func &&func)
Start the worker thread with a given function.
Definition thread.hxx:246
CThreadedObject & workerThread()
Get a reference to this thread controller.
Definition thread.hxx:280
Out-of-tree user-extension hooks for Dawn.
Definition bindable.hxx:13
Single configuration item within object.
ObjectCfgData_t data[]
Configuration data array (flexible, size from cfgid.s.size).
UObjectCfgId cfgid
Configuration ID header (type, class, id, size, rw, dtype).
Callback set for poll-based worker loops.
Definition poll_loop.hxx:28
void(* afterPoll)(void *priv, struct pollfd *pfds, nfds_t nfds, int ret)
Optional hook called after each poll() return.
Definition poll_loop.hxx:39
int(* onPollReady)(void *priv, struct pollfd *pfds, nfds_t nfds, int pollRet)
Optional hook called when poll() reports ready descriptors.
Definition poll_loop.hxx:45
int(* beforePoll)(void *priv, struct pollfd *pfds, nfds_t nfds)
Optional hook called before each poll() call.
Definition poll_loop.hxx:33
ObjectCfgId v
Raw 32-bit ConfigID value (for storage, comparison).
Definition objectcfg.hxx:82
uint32_t cls
Object class (bits 21-29, max 511).
uint32_t id
Configuration identifier (bits 0-4, max 31).
Definition objectcfg.hxx:94
uint32_t size
Configuration data size in 32-bit words (bits 5-14, max 1023).
struct dawn::SObjectCfg::UObjectCfgId::@10 s
Bit-field structure for named member access.