Dawn Framework 1.0
Universal data acquisition framework for embedded systems
stream_notifier.cxx
1// dawn/src/io/stream_notifier.cxx
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5
6#include "dawn/io/stream_notifier.hxx"
7
8#include <new>
9
10#include "dawn/common/poll_loop.hxx"
11#include "dawn/io/common.hxx"
12
13using namespace dawn;
14
15CStreamNotifier::CStreamNotifier()
16 : io(nullptr)
17 , data(nullptr)
18 , batch(1)
19 , threadCtl()
20{
21}
22
23CStreamNotifier::~CStreamNotifier()
24{
25 // Make sure thread is stopped
26
27 stop();
28
29 // Free allocated data
30
31 if (data)
32 {
33 delete data;
34 }
35
36 // Clear registered callbacks
37
38 users.clear();
39}
40
41void CStreamNotifier::thread()
42{
43 SPollLoopCallbacks callbacks;
44 struct pollfd pfd;
45
46 struct
47 {
48 CStreamNotifier *self;
49 } ctx;
50
51 ctx.self = this;
52
53 // Set up single file descriptor for polling
54
55 pfd.fd = io->getFd();
56 pfd.events = POLLIN;
57 pfd.revents = 0;
58
59 // No pre/post-poll hooks needed for single-fd stream
60
61 callbacks.beforePoll = nullptr;
62 callbacks.afterPoll = nullptr;
63
64 callbacks.onPollReady = [](void *priv, struct pollfd *pollfds, nfds_t nfds, int pollRet) -> int
65 {
66 CStreamNotifier *self;
67
68 (void)pollfds;
69 (void)nfds;
70 (void)pollRet;
71
72 if (priv == nullptr)
73 {
74 return -EINVAL;
75 }
76
77 self = static_cast<decltype(ctx) *>(priv)->self;
78 if (self == nullptr)
79 {
80 return -EINVAL;
81 }
82
83 // Read data with configured batch count
84
85 self->io->getData(*self->data, self->batch);
86
87 // Dispatch to all registered callbacks
88
89 for (SIONotifier &user : self->users)
90 {
91 if (user.cb)
92 {
93 user.cb(user.priv, self->data);
94 }
95 }
96
97 return OK;
98 };
99
100 CPollLoopRunner::run(threadCtl, &pfd, 1, DAWN_POLL_TIMEOUT_MS, callbacks, &ctx);
101}
102
103int CStreamNotifier::start()
104{
105 if (io == nullptr)
106 {
107 DAWNERR("No IO bound to stream notifier\n");
108 return -EINVAL;
109 }
110
111 // Start notifier thread
112
113 threadCtl.setThreadFunc([this]() { thread(); });
114 return threadCtl.threadStart();
115}
116
117int CStreamNotifier::stop()
118{
119 // Stop notifier thread
120
121 return threadCtl.threadStop();
122}
123
125{
126 if (n.io == nullptr)
127 {
128 DAWNERR("IO pointer is null\n");
129 return -EINVAL;
130 }
131
132 if (!n.io->isNotify())
133 {
134 DAWNERR("isNotify=false!\n");
135 return -EINVAL;
136 }
137
138 // Stream notifier is 1:1 with IO
139
140 if (io != nullptr && io != n.io)
141 {
142 DAWNERR("Stream notifier already bound to different IO\n");
143 return -EINVAL;
144 }
145
146 // First registration: bind IO and allocate data buffer
147
148 if (io == nullptr)
149 {
150 if (threadCtl.hasThreadObject())
151 {
152 DAWNERR("Cannot bind new IO after thread started\n");
153 return -EPERM;
154 }
155
156 io = n.io;
157 batch = n.io->getNotifyBatch();
158 data = n.io->ddata_alloc(batch);
159 if (data == nullptr)
160 {
161 DAWNERR("Failed to allocate data for stream notifier\n");
162 io = nullptr;
163 return -ENOMEM;
164 }
165 }
166
167 // Append callback
168
169 users.push_back(n);
170
171 return OK;
172}
int getData(IODataCmn &data, size_t len, size_t offset=0)
Get data from I/O (public interface with stats tracking).
Definition common.hxx:353
virtual bool isNotify() const =0
Check if IO supports notifications.
virtual int getFd() const
Get file descriptor for notifications.
Definition common.hxx:425
io_ddata_t * ddata_alloc(size_t batch, size_t chunk_size=0)
Allocate data buffer for this I/O.
Definition common.cxx:247
static int run(CThreadedObject &threadCtl, struct pollfd *pfds, nfds_t nfds, int timeoutMs, const SPollLoopCallbacks &callbacks, void *priv)
Run poll loop until quit is requested.
Definition poll_loop.hxx:61
Stream-based I/O notifier with dedicated thread per I/O.
int regNotifier(SIONotifier n)
Register I/O notification callback.
int threadStop()
Stop the worker thread.
Definition thread.cxx:240
int threadStart()
Start the worker thread.
Definition thread.cxx:166
void setThreadFunc(Func &&func)
Assign the function executed by threadStart().
Definition thread.hxx:100
Out-of-tree user-extension hooks for Dawn.
Definition bindable.hxx:13
Notifier registration structure.
Definition inotifier.hxx:46
CIOCommon * io
I/O object pointer.
Definition inotifier.hxx:62
Callback set for poll-based worker loops.
Definition poll_loop.hxx:28
void(* afterPoll)(void *priv, struct pollfd *pfds, nfds_t nfds, int ret)
Optional hook called after each poll() return.
Definition poll_loop.hxx:39
int(* onPollReady)(void *priv, struct pollfd *pfds, nfds_t nfds, int pollRet)
Optional hook called when poll() reports ready descriptors.
Definition poll_loop.hxx:45
int(* beforePoll)(void *priv, struct pollfd *pfds, nfds_t nfds)
Optional hook called before each poll() call.
Definition poll_loop.hxx:33