diff options
Diffstat (limited to 'thirdparty/libwebsockets/lib/core/pollfd.c')
-rw-r--r-- | thirdparty/libwebsockets/lib/core/pollfd.c | 566 |
1 files changed, 566 insertions, 0 deletions
diff --git a/thirdparty/libwebsockets/lib/core/pollfd.c b/thirdparty/libwebsockets/lib/core/pollfd.c new file mode 100644 index 0000000000..834298577a --- /dev/null +++ b/thirdparty/libwebsockets/lib/core/pollfd.c @@ -0,0 +1,566 @@ +/* + * libwebsockets - small server side websockets and web server implementation + * + * Copyright (C) 2010-2017 Andy Green <andy@warmcat.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation: + * version 2.1 of the License. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + * MA 02110-1301 USA + */ + +#include "core/private.h" + +int +_lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa) +{ +#if !defined(LWS_WITH_LIBUV) && !defined(LWS_WITH_LIBEV) && !defined(LWS_WITH_LIBEVENT) + volatile struct lws_context_per_thread *vpt; +#endif + struct lws_context_per_thread *pt; + struct lws_context *context; + int ret = 0, pa_events = 1; + struct lws_pollfd *pfd; + int sampled_tid, tid; + + if (!wsi) + return 0; + + assert(wsi->position_in_fds_table == LWS_NO_FDS_POS || + wsi->position_in_fds_table >= 0); + + if (wsi->position_in_fds_table == LWS_NO_FDS_POS) + return 0; + + if (((volatile struct lws *)wsi)->handling_pollout && + !_and && _or == LWS_POLLOUT) { + /* + * Happening alongside service thread handling POLLOUT. + * The danger is when he is finished, he will disable POLLOUT, + * countermanding what we changed here. + * + * Instead of changing the fds, inform the service thread + * what happened, and ask it to leave POLLOUT active on exit + */ + ((volatile struct lws *)wsi)->leave_pollout_active = 1; + /* + * by definition service thread is not in poll wait, so no need + * to cancel service + */ + + lwsl_debug("%s: using leave_pollout_active\n", __func__); + + return 0; + } + + context = wsi->context; + pt = &context->pt[(int)wsi->tsi]; + + assert(wsi->position_in_fds_table < (int)pt->fds_count); + +#if !defined(LWS_WITH_LIBUV) && \ + !defined(LWS_WITH_LIBEV) && \ + !defined(LWS_WITH_LIBEVENT) + /* + * This only applies when we use the default poll() event loop. + * + * BSD can revert pa->events at any time, when the kernel decides to + * exit from poll(). We can't protect against it using locking. + * + * Therefore we must check first if the service thread is in poll() + * wait; if so, we know we must be being called from a foreign thread, + * and we must keep a strictly ordered list of changes we made instead + * of trying to apply them, since when poll() exits, which may happen + * at any time it would revert our changes. + * + * The plat code will apply them when it leaves the poll() wait + * before doing anything else. + */ + + vpt = (volatile struct lws_context_per_thread *)pt; + + vpt->foreign_spinlock = 1; + lws_memory_barrier(); + + if (vpt->inside_poll) { + struct lws_foreign_thread_pollfd *ftp, **ftp1; + /* + * We are certainly a foreign thread trying to change events + * while the service thread is in the poll() wait. + * + * Create a list of changes to be applied after poll() exit, + * instead of trying to apply them now. + */ + ftp = lws_malloc(sizeof(*ftp), "ftp"); + if (!ftp) { + vpt->foreign_spinlock = 0; + lws_memory_barrier(); + ret = -1; + goto bail; + } + + ftp->_and = _and; + ftp->_or = _or; + ftp->fd_index = wsi->position_in_fds_table; + ftp->next = NULL; + + /* place at END of list to maintain order */ + ftp1 = (struct lws_foreign_thread_pollfd **) + &vpt->foreign_pfd_list; + while (*ftp1) + ftp1 = &((*ftp1)->next); + + *ftp1 = ftp; + vpt->foreign_spinlock = 0; + lws_memory_barrier(); + lws_cancel_service_pt(wsi); + + return 0; + } + + vpt->foreign_spinlock = 0; + lws_memory_barrier(); +#endif + + pfd = &pt->fds[wsi->position_in_fds_table]; + pa->fd = wsi->desc.sockfd; + lwsl_debug("%s: wsi %p: fd %d events %d -> %d\n", __func__, wsi, + pa->fd, pfd->events, (pfd->events & ~_and) | _or); + pa->prev_events = pfd->events; + pa->events = pfd->events = (pfd->events & ~_and) | _or; + + if (wsi->http2_substream) + return 0; + + if (wsi->vhost && + wsi->vhost->protocols[0].callback(wsi, + LWS_CALLBACK_CHANGE_MODE_POLL_FD, + wsi->user_space, (void *)pa, 0)) { + ret = -1; + goto bail; + } + + if (context->event_loop_ops->io) { + if (_and & LWS_POLLIN) + context->event_loop_ops->io(wsi, + LWS_EV_STOP | LWS_EV_READ); + + if (_or & LWS_POLLIN) + context->event_loop_ops->io(wsi, + LWS_EV_START | LWS_EV_READ); + + if (_and & LWS_POLLOUT) + context->event_loop_ops->io(wsi, + LWS_EV_STOP | LWS_EV_WRITE); + + if (_or & LWS_POLLOUT) + context->event_loop_ops->io(wsi, + LWS_EV_START | LWS_EV_WRITE); + } + + /* + * if we changed something in this pollfd... + * ... and we're running in a different thread context + * than the service thread... + * ... and the service thread is waiting ... + * then cancel it to force a restart with our changed events + */ + pa_events = pa->prev_events != pa->events; + + if (pa_events) { + if (lws_plat_change_pollfd(context, wsi, pfd)) { + lwsl_info("%s failed\n", __func__); + ret = -1; + goto bail; + } + sampled_tid = pt->service_tid; + if (sampled_tid && wsi->vhost) { + tid = wsi->vhost->protocols[0].callback(wsi, + LWS_CALLBACK_GET_THREAD_ID, NULL, NULL, 0); + if (tid == -1) { + ret = -1; + goto bail; + } + if (tid != sampled_tid) + lws_cancel_service_pt(wsi); + } + } + +bail: + return ret; +} + +#ifndef LWS_NO_SERVER +/* + * Enable or disable listen sockets on this pt globally... + * it's modulated according to the pt having space for a new accept. + */ +static void +lws_accept_modulation(struct lws_context *context, + struct lws_context_per_thread *pt, int allow) +{ + struct lws_vhost *vh = context->vhost_list; + struct lws_pollargs pa1; + + while (vh) { + if (vh->lserv_wsi) { + if (allow) + _lws_change_pollfd(vh->lserv_wsi, + 0, LWS_POLLIN, &pa1); + else + _lws_change_pollfd(vh->lserv_wsi, + LWS_POLLIN, 0, &pa1); + } + vh = vh->vhost_next; + } +} +#endif + +int +__insert_wsi_socket_into_fds(struct lws_context *context, struct lws *wsi) +{ + struct lws_pollargs pa = { wsi->desc.sockfd, LWS_POLLIN, 0 }; + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; + int ret = 0; + + + lwsl_debug("%s: %p: tsi=%d, sock=%d, pos-in-fds=%d\n", + __func__, wsi, wsi->tsi, wsi->desc.sockfd, pt->fds_count); + + if ((unsigned int)pt->fds_count >= context->fd_limit_per_thread) { + lwsl_err("Too many fds (%d vs %d)\n", context->max_fds, + context->fd_limit_per_thread ); + return 1; + } + +#if !defined(_WIN32) + if (wsi->desc.sockfd - lws_plat_socket_offset() >= context->max_fds) { + lwsl_err("Socket fd %d is too high (%d) offset %d\n", + wsi->desc.sockfd, context->max_fds, + lws_plat_socket_offset()); + return 1; + } +#endif + + assert(wsi); + assert(wsi->event_pipe || wsi->vhost); + assert(lws_socket_is_valid(wsi->desc.sockfd)); + + if (wsi->vhost && + wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL, + wsi->user_space, (void *) &pa, 1)) + return -1; + + pt->count_conns++; + insert_wsi(context, wsi); + wsi->position_in_fds_table = pt->fds_count; + + pt->fds[wsi->position_in_fds_table].fd = wsi->desc.sockfd; + pt->fds[wsi->position_in_fds_table].events = LWS_POLLIN; + pa.events = pt->fds[pt->fds_count].events; + + lws_plat_insert_socket_into_fds(context, wsi); + + /* external POLL support via protocol 0 */ + if (wsi->vhost && + wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_ADD_POLL_FD, + wsi->user_space, (void *) &pa, 0)) + ret = -1; +#ifndef LWS_NO_SERVER + /* if no more room, defeat accepts on this thread */ + if ((unsigned int)pt->fds_count == context->fd_limit_per_thread - 1) + lws_accept_modulation(context, pt, 0); +#endif + + if (wsi->vhost && + wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL, + wsi->user_space, (void *)&pa, 1)) + ret = -1; + + return ret; +} + +int +__remove_wsi_socket_from_fds(struct lws *wsi) +{ + struct lws_context *context = wsi->context; + struct lws_pollargs pa = { wsi->desc.sockfd, 0, 0 }; + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; + struct lws *end_wsi; + int v; + int m, ret = 0; + +#if !defined(_WIN32) + if (wsi->desc.sockfd - lws_plat_socket_offset() > context->max_fds) { + lwsl_err("fd %d too high (%d)\n", wsi->desc.sockfd, + context->max_fds); + return 1; + } +#endif + + if (wsi->vhost && + wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL, + wsi->user_space, (void *)&pa, 1)) + return -1; + + lws_same_vh_protocol_remove(wsi); + + /* the guy who is to be deleted's slot index in pt->fds */ + m = wsi->position_in_fds_table; + + /* these are the only valid possibilities for position_in_fds_table */ + assert(m == LWS_NO_FDS_POS || (m >= 0 && + (unsigned int)m < pt->fds_count)); + + if (context->event_loop_ops->io) + context->event_loop_ops->io(wsi, + LWS_EV_STOP | LWS_EV_READ | LWS_EV_WRITE | + LWS_EV_PREPARE_DELETION); + + lwsl_debug("%s: wsi=%p, skt=%d, fds pos=%d, end guy pos=%d, endfd=%d\n", + __func__, wsi, wsi->desc.sockfd, wsi->position_in_fds_table, + pt->fds_count, pt->fds[pt->fds_count].fd); + + if (m != LWS_NO_FDS_POS) { + + /* have the last guy take up the now vacant slot */ + pt->fds[m] = pt->fds[pt->fds_count - 1]; + /* this decrements pt->fds_count */ + lws_plat_delete_socket_from_fds(context, wsi, m); + pt->count_conns--; + v = (int) pt->fds[m].fd; + /* end guy's "position in fds table" is now the deletion + * guy's old one */ + end_wsi = wsi_from_fd(context, v); + if (!end_wsi) { + lwsl_err("no wsi for fd %d pos %d, pt->fds_count=%d\n", + (int)pt->fds[m].fd, m, pt->fds_count); + assert(0); + } else + end_wsi->position_in_fds_table = m; + + /* deletion guy's lws_lookup entry needs nuking */ + delete_from_fd(context, wsi->desc.sockfd); + + /* removed wsi has no position any more */ + wsi->position_in_fds_table = LWS_NO_FDS_POS; + } + + /* remove also from external POLL support via protocol 0 */ + if (lws_socket_is_valid(wsi->desc.sockfd) && wsi->vhost && + wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_DEL_POLL_FD, + wsi->user_space, (void *) &pa, 0)) + ret = -1; + +#ifndef LWS_NO_SERVER + if (!context->being_destroyed && + /* if this made some room, accept connects on this thread */ + (unsigned int)pt->fds_count < context->fd_limit_per_thread - 1) + lws_accept_modulation(context, pt, 1); +#endif + + if (wsi->vhost && + wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL, + wsi->user_space, (void *) &pa, 1)) + ret = -1; + + return ret; +} + +int +__lws_change_pollfd(struct lws *wsi, int _and, int _or) +{ + struct lws_context *context; + struct lws_pollargs pa; + int ret = 0; + + if (!wsi || (!wsi->protocol && !wsi->event_pipe) || + wsi->position_in_fds_table == LWS_NO_FDS_POS) + return 0; + + context = lws_get_context(wsi); + if (!context) + return 1; + + if (wsi->vhost && + wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL, + wsi->user_space, (void *) &pa, 0)) + return -1; + + ret = _lws_change_pollfd(wsi, _and, _or, &pa); + if (wsi->vhost && + wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL, + wsi->user_space, (void *) &pa, 0)) + ret = -1; + + return ret; +} + +int +lws_change_pollfd(struct lws *wsi, int _and, int _or) +{ + struct lws_context_per_thread *pt; + int ret = 0; + + pt = &wsi->context->pt[(int)wsi->tsi]; + + lws_pt_lock(pt, __func__); + ret = __lws_change_pollfd(wsi, _and, _or); + lws_pt_unlock(pt); + + return ret; +} + +LWS_VISIBLE int +lws_callback_on_writable(struct lws *wsi) +{ + struct lws_context_per_thread *pt; + + if (lwsi_state(wsi) == LRS_SHUTDOWN) + return 0; + + if (wsi->socket_is_permanently_unusable) + return 0; + + pt = &wsi->context->pt[(int)wsi->tsi]; + + lws_stats_atomic_bump(wsi->context, pt, LWSSTATS_C_WRITEABLE_CB_REQ, 1); +#if defined(LWS_WITH_STATS) + if (!wsi->active_writable_req_us) { + wsi->active_writable_req_us = lws_time_in_microseconds(); + lws_stats_atomic_bump(wsi->context, pt, + LWSSTATS_C_WRITEABLE_CB_EFF_REQ, 1); + } +#endif + + + if (wsi->role_ops->callback_on_writable) { + if (wsi->role_ops->callback_on_writable(wsi)) + return 1; + wsi = lws_get_network_wsi(wsi); + } + + if (wsi->position_in_fds_table == LWS_NO_FDS_POS) { + lwsl_debug("%s: failed to find socket %d\n", __func__, + wsi->desc.sockfd); + return -1; + } + + if (__lws_change_pollfd(wsi, 0, LWS_POLLOUT)) + return -1; + + return 1; +} + + +/* + * stitch protocol choice into the vh protocol linked list + * We always insert ourselves at the start of the list + * + * X <-> B + * X <-> pAn <-> pB + * + * Illegal to attach more than once without detach inbetween + */ +void +lws_same_vh_protocol_insert(struct lws *wsi, int n) +{ + lws_vhost_lock(wsi->vhost); + + if (!lws_dll_is_null(&wsi->same_vh_protocol)) + lws_dll_lws_remove(&wsi->same_vh_protocol); + + lws_dll_lws_add_front(&wsi->same_vh_protocol, + &wsi->vhost->same_vh_protocol_heads[n]); + + lws_vhost_unlock(wsi->vhost); +} + +void +__lws_same_vh_protocol_remove(struct lws *wsi) +{ + if (!lws_dll_is_null(&wsi->same_vh_protocol)) + lws_dll_lws_remove(&wsi->same_vh_protocol); +} + +void +lws_same_vh_protocol_remove(struct lws *wsi) +{ + if (!wsi->vhost) + return; + + lws_vhost_lock(wsi->vhost); + + __lws_same_vh_protocol_remove(wsi); + + lws_vhost_unlock(wsi->vhost); +} + + +LWS_VISIBLE int +lws_callback_on_writable_all_protocol_vhost(const struct lws_vhost *vhost, + const struct lws_protocols *protocol) +{ + struct lws *wsi; + int n; + + if (protocol < vhost->protocols || + protocol >= (vhost->protocols + vhost->count_protocols)) { + lwsl_err("%s: protocol %p is not from vhost %p (%p - %p)\n", + __func__, protocol, vhost->protocols, vhost, + (vhost->protocols + vhost->count_protocols)); + + return -1; + } + + n = (int)(protocol - vhost->protocols); + + lws_start_foreach_dll_safe(struct lws_dll_lws *, d, d1, + vhost->same_vh_protocol_heads[n].next) { + wsi = lws_container_of(d, struct lws, same_vh_protocol); + + assert(wsi->protocol == protocol); + lws_callback_on_writable(wsi); + + } lws_end_foreach_dll_safe(d, d1); + + return 0; +} + +LWS_VISIBLE int +lws_callback_on_writable_all_protocol(const struct lws_context *context, + const struct lws_protocols *protocol) +{ + struct lws_vhost *vhost; + int n; + + if (!context) + return 0; + + vhost = context->vhost_list; + + while (vhost) { + for (n = 0; n < vhost->count_protocols; n++) + if (protocol->callback == + vhost->protocols[n].callback && + !strcmp(protocol->name, vhost->protocols[n].name)) + break; + if (n != vhost->count_protocols) + lws_callback_on_writable_all_protocol_vhost( + vhost, &vhost->protocols[n]); + + vhost = vhost->vhost_next; + } + + return 0; +} |