6#include "dawn/io/notifier.hxx"
10#include "dawn/common/poll_loop.hxx"
11#include "dawn/io/common.hxx"
17void CIONotifier::updatePfds()
23 for (
const SIONotifierPriv &tmp : vnote)
30 for (
const SIONotifier &n : tmp.user)
39 fd = tmp.user[0].io->getFd();
46 pfds[i].events = POLLIN;
53 DAWNERR(
"Invalid FD %d from IO 0x%08" PRIx32
", skipping notifier\n",
55 tmp.user[0].io->getIdV());
69void CIONotifier::thread()
82 callbacks.
beforePoll = [](
void *priv,
struct pollfd *pollfds, nfds_t nfds) ->
int
91 self =
static_cast<decltype(ctx) *
>(priv)->self;
101 if (self->pfdsUpdate.load())
104 self->pfdsUpdate =
false;
110 callbacks.
afterPoll = [](
void *priv,
struct pollfd *pollfds, nfds_t nfds,
int ret)
118 callbacks.
onPollReady = [](
void *priv,
struct pollfd *pollfds, nfds_t nfds,
int pollRet) ->
int
124 if (priv ==
nullptr || pollfds ==
nullptr || nfds == 0 || pollRet <= 0)
129 self =
static_cast<decltype(ctx) *
>(priv)->self;
138 self->pfdsLock.lock();
140 for (i = 0; i < self->pfdsLen; i++)
142 if (pollfds[i].revents & POLLIN)
145 self->vnote[i].user[0].io->getData(*self->vnote[i].data, self->vnote[i].batch);
147 for (SIONotifier &user : self->vnote[i].user)
151 user.cb(user.priv, self->vnote[i].data);
155 pollfds[i].revents = 0;
167 self->pfdsLock.unlock();
174CIONotifier::~CIONotifier()
186 for (
const auto &n : vnote)
203int CIONotifier::start()
207 pfdsLen = vnote.size();
208 pfds =
new (std::nothrow) pollfd[pfdsLen]();
211 DAWNERR(
"Failed to allocate poll array\n");
225int CIONotifier::stop()
240 DAWNERR(
"IO pointer is null\n");
246 DAWNERR(
"isNotify=false for IO 0x%08" PRIx32
"\n", n.
io->
getIdV());
252 for (
const auto &v : vnote)
254 if (n.
io == v.user[0].io)
269 vnote[update].user.push_back(n);
278 if (threadCtl.hasThreadObject())
280 DAWNERR(
"Cannot register new notifier after thread started\n");
285 np.user.push_back(n);
289 np.batch = n.
io->getNotifyBatch();
291 if (np.data ==
nullptr)
293 DAWNERR(
"Failed to allocate data for notifier\n");
308 if (io ==
nullptr || data ==
nullptr)
315 for (
auto &entry : vnote)
317 if (entry.user.empty() || entry.user[0].io != io)
326 user.cb(user.priv, data);
Base class for all I/O objects.
virtual bool isNotify() const =0
Check if IO supports notifications.
io_ddata_t * ddata_alloc(size_t batch, size_t chunk_size=0)
Allocate data buffer for this I/O.
I/O notification handler with poll-based event delivery.
int notifyData(CIOCommon *io, io_ddata_t *data)
Emit an immediate notification for already-available data.
int regNotifier(SIONotifier n)
Register I/O notification callback.
SObjectId::ObjectId getIdV() const
Get object identifier as raw 32-bit value.
static int run(CThreadedObject &threadCtl, struct pollfd *pfds, nfds_t nfds, int timeoutMs, const SPollLoopCallbacks &callbacks, void *priv)
Run poll loop until quit is requested.
int threadStop()
Stop the worker thread.
int threadStart()
Start the worker thread.
void setThreadFunc(Func &&func)
Assign the function executed by threadStart().
Out-of-tree user-extension hooks for Dawn.
Notifier registration structure.
CIOCommon * io
I/O object pointer.
Callback set for poll-based worker loops.
void(* afterPoll)(void *priv, struct pollfd *pfds, nfds_t nfds, int ret)
Optional hook called after each poll() return.
int(* onPollReady)(void *priv, struct pollfd *pfds, nfds_t nfds, int pollRet)
Optional hook called when poll() reports ready descriptors.
int(* beforePoll)(void *priv, struct pollfd *pfds, nfds_t nfds)
Optional hook called before each poll() call.
Heap-allocated dynamic I/O data buffer.