Dawn Framework 1.0
Universal data acquisition framework for embedded systems
notifier.cxx
1// dawn/src/io/notifier.cxx
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5
6#include "dawn/io/notifier.hxx"
7
8#include <new>
9
10#include "dawn/common/poll_loop.hxx"
11#include "dawn/io/common.hxx"
12
13using namespace dawn;
14
15// Update poll array.
16
17void CIONotifier::updatePfds()
18{
19 size_t i = 0;
20
21 pfdsLock.lock();
22
23 for (const SIONotifierPriv &tmp : vnote)
24 {
25 bool found = false;
26 int fd;
27
28 // Check for at least one valid callback
29
30 for (const SIONotifier &n : tmp.user)
31 {
32 if (n.cb)
33 {
34 found = true;
35 break;
36 }
37 }
38
39 fd = tmp.user[0].io->getFd();
40
41 // Add to poll array if callback is registered and FD is valid
42
43 if (found && fd >= 0)
44 {
45 pfds[i].fd = fd;
46 pfds[i].events = POLLIN;
47 pfds[i].revents = 0;
48 }
49 else
50 {
51 if (fd < 0)
52 {
53 DAWNERR("Invalid FD %d from IO 0x%08" PRIx32 ", skipping notifier\n",
54 fd,
55 tmp.user[0].io->getIdV());
56 }
57
58 pfds[i].fd = -1;
59 }
60
61 // Next item
62
63 i++;
64 }
65
66 pfdsLock.unlock();
67}
68
69void CIONotifier::thread()
70{
71 SPollLoopCallbacks callbacks;
72
73 struct
74 {
75 CIONotifier *self;
76 } ctx;
77
78 ctx.self = this;
79
80 /* Loop until stop called */
81
82 callbacks.beforePoll = [](void *priv, struct pollfd *pollfds, nfds_t nfds) -> int
83 {
84 CIONotifier *self;
85
86 if (priv == nullptr)
87 {
88 return -EINVAL;
89 }
90
91 self = static_cast<decltype(ctx) *>(priv)->self;
92 if (self == nullptr)
93 {
94 return -EINVAL;
95 }
96
97 (void)pollfds;
98 (void)nfds;
99
100 // Update poll array if requested
101 if (self->pfdsUpdate.load())
102 {
103 self->updatePfds();
104 self->pfdsUpdate = false;
105 }
106
107 return OK;
108 };
109
110 callbacks.afterPoll = [](void *priv, struct pollfd *pollfds, nfds_t nfds, int ret)
111 {
112 (void)priv;
113 (void)pollfds;
114 (void)nfds;
115 (void)ret;
116 };
117
118 callbacks.onPollReady = [](void *priv, struct pollfd *pollfds, nfds_t nfds, int pollRet) -> int
119 {
120 CIONotifier *self;
121 size_t i;
122 int j;
123
124 if (priv == nullptr || pollfds == nullptr || nfds == 0 || pollRet <= 0)
125 {
126 return -EINVAL;
127 }
128
129 self = static_cast<decltype(ctx) *>(priv)->self;
130 if (self == nullptr)
131 {
132 return -EINVAL;
133 }
134
135 j = 0;
136
137 // Lock poll data until we handle all requests
138 self->pfdsLock.lock();
139
140 for (i = 0; i < self->pfdsLen; i++)
141 {
142 if (pollfds[i].revents & POLLIN)
143 {
144 // Read data with configured batch count
145 self->vnote[i].user[0].io->getData(*self->vnote[i].data, self->vnote[i].batch);
146
147 for (SIONotifier &user : self->vnote[i].user)
148 {
149 if (user.cb)
150 {
151 user.cb(user.priv, self->vnote[i].data);
152 }
153 }
154
155 pollfds[i].revents = 0;
156 j++;
157 }
158
159 // poll() returns the count of fds with events; once we've handled
160 // that many we can stop scanning the rest.
161 if (j >= pollRet)
162 {
163 break;
164 }
165 }
166
167 self->pfdsLock.unlock();
168 return OK;
169 };
170
171 CPollLoopRunner::run(threadCtl, pfds, pfdsLen, DAWN_POLL_TIMEOUT_MS, callbacks, &ctx);
172}
173
174CIONotifier::~CIONotifier()
175{
176 // Make sure thread is stopped
177
178 stop();
179
180 // Clear flag
181
182 pfdsUpdate = false;
183
184 // Free allocated data
185
186 for (const auto &n : vnote)
187 {
188 if (n.data)
189 {
190 delete n.data;
191 }
192 }
193
194 // Clear registered notifications
195
196 vnote.clear();
197
198 // Free poll array
199
200 delete[] pfds;
201}
202
203int CIONotifier::start()
204{
205 // Allocate poll array
206
207 pfdsLen = vnote.size();
208 pfds = new (std::nothrow) pollfd[pfdsLen]();
209 if (pfds == nullptr)
210 {
211 DAWNERR("Failed to allocate poll array\n");
212 return -ENOMEM;
213 }
214
215 // Initialize notifiers
216
217 updatePfds();
218
219 // Start notifier thread
220
221 threadCtl.setThreadFunc([this]() { thread(); });
222 return threadCtl.threadStart();
223}
224
225int CIONotifier::stop()
226{
227 // Stop notifier thread
228
229 return threadCtl.threadStop();
230}
231
233{
234 SIONotifierPriv np;
235 int update = -1;
236 int i = 0;
237
238 if (n.io == nullptr)
239 {
240 DAWNERR("IO pointer is null\n");
241 return -EINVAL;
242 }
243
244 if (!n.io->isNotify())
245 {
246 DAWNERR("isNotify=false for IO 0x%08" PRIx32 "\n", n.io->getIdV());
247 return -EINVAL;
248 }
249
250 // Check if we need update old entry
251
252 for (const auto &v : vnote)
253 {
254 if (n.io == v.user[0].io)
255 {
256 update = i;
257 break;
258 }
259
260 i++;
261 }
262
263 pfdsLock.lock();
264
265 // Update vnote entry or add to vector
266
267 if (update != -1)
268 {
269 vnote[update].user.push_back(n);
270
271 pfdsUpdate = true;
272 }
273 else
274 {
275 // regNotifier() can be called only before start was called
276 // update vnote when thread is running is not supported now
277
278 if (threadCtl.hasThreadObject())
279 {
280 DAWNERR("Cannot register new notifier after thread started\n");
281 pfdsLock.unlock();
282 return -EPERM;
283 }
284
285 np.user.push_back(n);
286
287 // Allocate data handler with configured batch count
288
289 np.batch = n.io->getNotifyBatch();
290 np.data = n.io->ddata_alloc(np.batch);
291 if (np.data == nullptr)
292 {
293 DAWNERR("Failed to allocate data for notifier\n");
294 pfdsLock.unlock();
295 return -ENOMEM;
296 }
297
298 vnote.push_back(np);
299 }
300
301 pfdsLock.unlock();
302
303 return OK;
304}
305
307{
308 if (io == nullptr || data == nullptr)
309 {
310 return -EINVAL;
311 }
312
313 pfdsLock.lock();
314
315 for (auto &entry : vnote)
316 {
317 if (entry.user.empty() || entry.user[0].io != io)
318 {
319 continue;
320 }
321
322 for (SIONotifier &user : entry.user)
323 {
324 if (user.cb)
325 {
326 user.cb(user.priv, data);
327 }
328 }
329
330 pfdsLock.unlock();
331 return OK;
332 }
333
334 pfdsLock.unlock();
335 return -ENOENT;
336}
Base class for all I/O objects.
Definition common.hxx:27
virtual bool isNotify() const =0
Check if IO supports notifications.
io_ddata_t * ddata_alloc(size_t batch, size_t chunk_size=0)
Allocate data buffer for this I/O.
Definition common.cxx:247
I/O notification handler with poll-based event delivery.
Definition notifier.hxx:31
int notifyData(CIOCommon *io, io_ddata_t *data)
Emit an immediate notification for already-available data.
Definition notifier.cxx:306
int regNotifier(SIONotifier n)
Register I/O notification callback.
Definition notifier.cxx:232
SObjectId::ObjectId getIdV() const
Get object identifier as raw 32-bit value.
Definition object.cxx:155
static int run(CThreadedObject &threadCtl, struct pollfd *pfds, nfds_t nfds, int timeoutMs, const SPollLoopCallbacks &callbacks, void *priv)
Run poll loop until quit is requested.
Definition poll_loop.hxx:61
int threadStop()
Stop the worker thread.
Definition thread.cxx:240
int threadStart()
Start the worker thread.
Definition thread.cxx:166
void setThreadFunc(Func &&func)
Assign the function executed by threadStart().
Definition thread.hxx:100
Out-of-tree user-extension hooks for Dawn.
Definition bindable.hxx:13
Notifier registration structure.
Definition inotifier.hxx:46
CIOCommon * io
I/O object pointer.
Definition inotifier.hxx:62
Callback set for poll-based worker loops.
Definition poll_loop.hxx:28
void(* afterPoll)(void *priv, struct pollfd *pfds, nfds_t nfds, int ret)
Optional hook called after each poll() return.
Definition poll_loop.hxx:39
int(* onPollReady)(void *priv, struct pollfd *pfds, nfds_t nfds, int pollRet)
Optional hook called when poll() reports ready descriptors.
Definition poll_loop.hxx:45
int(* beforePoll)(void *priv, struct pollfd *pfds, nfds_t nfds)
Optional hook called before each poll() call.
Definition poll_loop.hxx:33
Heap-allocated dynamic I/O data buffer.
Definition ddata.hxx:21