#include "curl_setup.h"
#include <curl/curl.h>
#include "urldata.h"
#include "url.h"
#include "cfilters.h"
#include "curl_trc.h"
#include "multiif.h"
#include "curlx/timeval.h"
#include "multi_ev.h"
#include "select.h"
#include "uint-bset.h"
#include "uint-spbset.h"
#include "uint-table.h"
#include "curlx/warnless.h"
#include "multihandle.h"
#include "socks.h"
#include "curl_memory.h"
#include "memdebug.h"
static void mev_in_callback(struct Curl_multi *multi, bool value)
{
multi->in_callback = value;
}
struct mev_sh_entry {
struct uint_spbset xfers;
struct connectdata *conn;
void *user_data;
unsigned int action;
unsigned int readers;
unsigned int writers;
BIT(announced);
};
static size_t mev_sh_entry_hash(void *key, size_t key_length, size_t slots_num)
{
curl_socket_t fd = *((curl_socket_t *) key);
(void)key_length;
return (fd % (curl_socket_t)slots_num);
}
static size_t mev_sh_entry_compare(void *k1, size_t k1_len,
void *k2, size_t k2_len)
{
(void)k1_len; (void)k2_len;
return (*((curl_socket_t *) k1)) == (*((curl_socket_t *) k2));
}
static void mev_sh_entry_dtor(void *freethis)
{
struct mev_sh_entry *entry = (struct mev_sh_entry *)freethis;
Curl_uint_spbset_destroy(&entry->xfers);
free(entry);
}
static struct mev_sh_entry *
mev_sh_entry_get(struct Curl_hash *sh, curl_socket_t s)
{
if(s != CURL_SOCKET_BAD) {
return Curl_hash_pick(sh, (char *)&s, sizeof(curl_socket_t));
}
return NULL;
}
static struct mev_sh_entry *
mev_sh_entry_add(struct Curl_hash *sh, curl_socket_t s)
{
struct mev_sh_entry *there = mev_sh_entry_get(sh, s);
struct mev_sh_entry *check;
if(there) {
return there;
}
check = calloc(1, sizeof(struct mev_sh_entry));
if(!check)
return NULL;
Curl_uint_spbset_init(&check->xfers);
if(!Curl_hash_add(sh, (char *)&s, sizeof(curl_socket_t), check)) {
mev_sh_entry_dtor(check);
return NULL;
}
return check;
}
static void mev_sh_entry_kill(struct Curl_multi *multi, curl_socket_t s)
{
Curl_hash_delete(&multi->ev.sh_entries, (char *)&s, sizeof(curl_socket_t));
}
static size_t mev_sh_entry_user_count(struct mev_sh_entry *e)
{
return Curl_uint_spbset_count(&e->xfers) + (e->conn ? 1 : 0);
}
static bool mev_sh_entry_xfer_known(struct mev_sh_entry *e,
struct Curl_easy *data)
{
return Curl_uint_spbset_contains(&e->xfers, data->mid);
}
static bool mev_sh_entry_conn_known(struct mev_sh_entry *e,
struct connectdata *conn)
{
return (e->conn == conn);
}
static bool mev_sh_entry_xfer_add(struct mev_sh_entry *e,
struct Curl_easy *data)
{
DEBUGASSERT(mev_sh_entry_user_count(e) < 100000);
return Curl_uint_spbset_add(&e->xfers, data->mid);
}
static bool mev_sh_entry_conn_add(struct mev_sh_entry *e,
struct connectdata *conn)
{
DEBUGASSERT(mev_sh_entry_user_count(e) < 100000);
DEBUGASSERT(!e->conn);
if(e->conn)
return FALSE;
e->conn = conn;
return TRUE;
}
static bool mev_sh_entry_xfer_remove(struct mev_sh_entry *e,
struct Curl_easy *data)
{
bool present = Curl_uint_spbset_contains(&e->xfers, data->mid);
if(present)
Curl_uint_spbset_remove(&e->xfers, data->mid);
return present;
}
static bool mev_sh_entry_conn_remove(struct mev_sh_entry *e,
struct connectdata *conn)
{
DEBUGASSERT(e->conn == conn);
if(e->conn == conn) {
e->conn = NULL;
return TRUE;
}
return FALSE;
}
static CURLMcode mev_forget_socket(struct Curl_multi *multi,
struct Curl_easy *data,
curl_socket_t s,
const char *cause)
{
struct mev_sh_entry *entry = mev_sh_entry_get(&multi->ev.sh_entries, s);
int rc = 0;
if(!entry)
return CURLM_OK;
if(entry->announced && multi->socket_cb) {
CURL_TRC_M(data, "ev %s, call(fd=%" FMT_SOCKET_T ", ev=REMOVE)",
cause, s);
mev_in_callback(multi, TRUE);
rc = multi->socket_cb(data, s, CURL_POLL_REMOVE,
multi->socket_userp, entry->user_data);
mev_in_callback(multi, FALSE);
entry->announced = FALSE;
}
mev_sh_entry_kill(multi, s);
if(rc == -1) {
multi->dead = TRUE;
return CURLM_ABORTED_BY_CALLBACK;
}
return CURLM_OK;
}
static CURLMcode mev_sh_entry_update(struct Curl_multi *multi,
struct Curl_easy *data,
struct mev_sh_entry *entry,
curl_socket_t s,
unsigned char last_action,
unsigned char cur_action)
{
int rc, comboaction;
DEBUGASSERT(multi->socket_cb);
if(!multi->socket_cb)
return CURLM_OK;
if(last_action == cur_action)
return CURLM_OK;
if(last_action & CURL_POLL_IN) {
DEBUGASSERT(entry->readers);
if(!(cur_action & CURL_POLL_IN))
entry->readers--;
}
else if(cur_action & CURL_POLL_IN)
entry->readers++;
if(last_action & CURL_POLL_OUT) {
DEBUGASSERT(entry->writers);
if(!(cur_action & CURL_POLL_OUT))
entry->writers--;
}
else if(cur_action & CURL_POLL_OUT)
entry->writers++;
DEBUGASSERT(entry->readers <= mev_sh_entry_user_count(entry));
DEBUGASSERT(entry->writers <= mev_sh_entry_user_count(entry));
DEBUGASSERT(entry->writers + entry->readers);
CURL_TRC_M(data, "ev update fd=%" FMT_SOCKET_T ", action '%s%s' -> '%s%s'"
" (%d/%d r/w)", s,
(last_action & CURL_POLL_IN) ? "IN" : "",
(last_action & CURL_POLL_OUT) ? "OUT" : "",
(cur_action & CURL_POLL_IN) ? "IN" : "",
(cur_action & CURL_POLL_OUT) ? "OUT" : "",
entry->readers, entry->writers);
comboaction = (entry->writers ? CURL_POLL_OUT : 0) |
(entry->readers ? CURL_POLL_IN : 0);
if(((int)entry->action == comboaction))
return CURLM_OK;
CURL_TRC_M(data, "ev update call(fd=%" FMT_SOCKET_T ", ev=%s%s)",
s, (comboaction & CURL_POLL_IN) ? "IN" : "",
(comboaction & CURL_POLL_OUT) ? "OUT" : "");
mev_in_callback(multi, TRUE);
rc = multi->socket_cb(data, s, comboaction, multi->socket_userp,
entry->user_data);
mev_in_callback(multi, FALSE);
entry->announced = TRUE;
if(rc == -1) {
multi->dead = TRUE;
return CURLM_ABORTED_BY_CALLBACK;
}
entry->action = (unsigned int)comboaction;
return CURLM_OK;
}
static CURLMcode mev_pollset_diff(struct Curl_multi *multi,
struct Curl_easy *data,
struct connectdata *conn,
struct easy_pollset *ps,
struct easy_pollset *prev_ps)
{
struct mev_sh_entry *entry;
curl_socket_t s;
unsigned int i, j;
CURLMcode mresult;
DEBUGASSERT(ps);
DEBUGASSERT(prev_ps);
for(i = 0; i < ps->n; i++) {
unsigned char last_action;
bool first_time = FALSE;
s = ps->sockets[i];
entry = mev_sh_entry_get(&multi->ev.sh_entries, s);
if(!entry) {
first_time = TRUE;
entry = mev_sh_entry_add(&multi->ev.sh_entries, s);
if(!entry)
return CURLM_OUT_OF_MEMORY;
CURL_TRC_M(data, "ev new entry fd=%" FMT_SOCKET_T, s);
}
else if(conn) {
first_time = !mev_sh_entry_conn_known(entry, conn);
}
else {
first_time = !mev_sh_entry_xfer_known(entry, data);
}
last_action = 0;
if(first_time) {
if(conn) {
if(!mev_sh_entry_conn_add(entry, conn))
return CURLM_OUT_OF_MEMORY;
}
else {
if(!mev_sh_entry_xfer_add(entry, data))
return CURLM_OUT_OF_MEMORY;
}
CURL_TRC_M(data, "ev entry fd=%" FMT_SOCKET_T ", added %s #%" FMT_OFF_T
", total=%u/%d (xfer/conn)", s,
conn ? "connection" : "transfer",
conn ? conn->connection_id : data->mid,
Curl_uint_spbset_count(&entry->xfers),
entry->conn ? 1 : 0);
}
else {
for(j = 0; j < prev_ps->n; j++) {
if(s == prev_ps->sockets[j]) {
last_action = prev_ps->actions[j];
break;
}
}
}
mresult = mev_sh_entry_update(multi, data, entry, s,
last_action, ps->actions[i]);
if(mresult)
return mresult;
}
for(i = 0; i < prev_ps->n; i++) {
bool stillused = FALSE;
s = prev_ps->sockets[i];
for(j = 0; j < ps->n; j++) {
if(s == ps->sockets[j]) {
stillused = TRUE;
break;
}
}
if(stillused)
continue;
entry = mev_sh_entry_get(&multi->ev.sh_entries, s);
if(!entry)
continue;
if(conn && !mev_sh_entry_conn_remove(entry, conn)) {
CURL_TRC_M(data, "ev entry fd=%" FMT_SOCKET_T ", conn lost "
"interest but is not registered", s);
DEBUGASSERT(NULL);
continue;
}
if(!conn && !mev_sh_entry_xfer_remove(entry, data)) {
CURL_TRC_M(data, "ev entry fd=%" FMT_SOCKET_T ", transfer lost "
"interest but is not registered", s);
DEBUGASSERT(NULL);
continue;
}
if(mev_sh_entry_user_count(entry)) {
mresult = mev_sh_entry_update(multi, data, entry, s,
prev_ps->actions[i], 0);
if(mresult)
return mresult;
CURL_TRC_M(data, "ev entry fd=%" FMT_SOCKET_T ", removed transfer, "
"total=%u/%d (xfer/conn)", s,
Curl_uint_spbset_count(&entry->xfers),
entry->conn ? 1 : 0);
}
else {
mresult = mev_forget_socket(multi, data, s, "last user gone");
if(mresult)
return mresult;
}
}
Curl_pollset_move(prev_ps, ps);
return CURLM_OK;
}
static void mev_pollset_dtor(void *key, size_t klen, void *entry)
{
struct easy_pollset *ps = entry;
(void)key;
(void)klen;
if(ps) {
Curl_pollset_cleanup(ps);
free(ps);
}
}
static struct easy_pollset*
mev_add_new_conn_pollset(struct connectdata *conn)
{
struct easy_pollset *ps;
ps = Curl_pollset_create();
if(!ps)
return NULL;
if(Curl_conn_meta_set(conn, CURL_META_MEV_POLLSET, ps, mev_pollset_dtor))
return NULL;
return ps;
}
static struct easy_pollset*
mev_add_new_xfer_pollset(struct Curl_easy *data)
{
struct easy_pollset *ps;
ps = Curl_pollset_create();
if(!ps)
return NULL;
if(Curl_meta_set(data, CURL_META_MEV_POLLSET, ps, mev_pollset_dtor))
return NULL;
return ps;
}
static struct easy_pollset *
mev_get_last_pollset(struct Curl_easy *data,
struct connectdata *conn)
{
if(data) {
if(conn)
return Curl_conn_meta_get(conn, CURL_META_MEV_POLLSET);
return Curl_meta_get(data, CURL_META_MEV_POLLSET);
}
return NULL;
}
static CURLMcode mev_assess(struct Curl_multi *multi,
struct Curl_easy *data,
struct connectdata *conn)
{
struct easy_pollset ps, *last_ps;
CURLMcode res = CURLM_OK;
if(!multi || !multi->socket_cb)
return CURLM_OK;
Curl_pollset_init(&ps);
if(conn) {
CURLcode r = Curl_conn_adjust_pollset(data, conn, &ps);
if(r) {
res = (r == CURLE_OUT_OF_MEMORY) ?
CURLM_OUT_OF_MEMORY : CURLM_INTERNAL_ERROR;
goto out;
}
}
else
Curl_multi_pollset(data, &ps, "ev assess");
last_ps = mev_get_last_pollset(data, conn);
if(!last_ps && ps.n) {
if(conn)
last_ps = mev_add_new_conn_pollset(conn);
else
last_ps = mev_add_new_xfer_pollset(data);
if(!last_ps) {
res = CURLM_OUT_OF_MEMORY;
goto out;
}
}
if(last_ps)
res = mev_pollset_diff(multi, data, conn, &ps, last_ps);
else
DEBUGASSERT(!ps.n);
out:
Curl_pollset_cleanup(&ps);
return res;
}
CURLMcode Curl_multi_ev_assess_xfer(struct Curl_multi *multi,
struct Curl_easy *data)
{
return mev_assess(multi, data, NULL);
}
CURLMcode Curl_multi_ev_assess_conn(struct Curl_multi *multi,
struct Curl_easy *data,
struct connectdata *conn)
{
return mev_assess(multi, data, conn);
}
CURLMcode Curl_multi_ev_assess_xfer_bset(struct Curl_multi *multi,
struct uint_bset *set)
{
unsigned int mid;
CURLMcode result = CURLM_OK;
if(multi && multi->socket_cb && Curl_uint_bset_first(set, &mid)) {
do {
struct Curl_easy *data = Curl_multi_get_easy(multi, mid);
if(data)
result = Curl_multi_ev_assess_xfer(multi, data);
}
while(!result && Curl_uint_bset_next(set, mid, &mid));
}
return result;
}
CURLMcode Curl_multi_ev_assign(struct Curl_multi *multi,
curl_socket_t s,
void *user_data)
{
struct mev_sh_entry *e = mev_sh_entry_get(&multi->ev.sh_entries, s);
if(!e)
return CURLM_BAD_SOCKET;
e->user_data = user_data;
return CURLM_OK;
}
void Curl_multi_ev_dirty_xfers(struct Curl_multi *multi,
curl_socket_t s,
bool *run_cpool)
{
struct mev_sh_entry *entry;
DEBUGASSERT(s != CURL_SOCKET_TIMEOUT);
entry = mev_sh_entry_get(&multi->ev.sh_entries, s);
if(entry) {
struct Curl_easy *data;
unsigned int mid;
if(Curl_uint_spbset_first(&entry->xfers, &mid)) {
do {
data = Curl_multi_get_easy(multi, mid);
if(data) {
Curl_multi_mark_dirty(data);
}
else {
CURL_TRC_M(multi->admin, "socket transfer %u no longer found", mid);
Curl_uint_spbset_remove(&entry->xfers, mid);
}
}
while(Curl_uint_spbset_next(&entry->xfers, mid, &mid));
}
if(entry->conn)
*run_cpool = TRUE;
}
}
void Curl_multi_ev_socket_done(struct Curl_multi *multi,
struct Curl_easy *data, curl_socket_t s)
{
mev_forget_socket(multi, data, s, "socket done");
}
void Curl_multi_ev_xfer_done(struct Curl_multi *multi,
struct Curl_easy *data)
{
DEBUGASSERT(!data->conn);
if(data != multi->admin) {
(void)mev_assess(multi, data, NULL);
Curl_meta_remove(data, CURL_META_MEV_POLLSET);
}
}
void Curl_multi_ev_conn_done(struct Curl_multi *multi,
struct Curl_easy *data,
struct connectdata *conn)
{
(void)mev_assess(multi, data, conn);
Curl_conn_meta_remove(conn, CURL_META_MEV_POLLSET);
}
void Curl_multi_ev_init(struct Curl_multi *multi, size_t hashsize)
{
Curl_hash_init(&multi->ev.sh_entries, hashsize, mev_sh_entry_hash,
mev_sh_entry_compare, mev_sh_entry_dtor);
}
void Curl_multi_ev_cleanup(struct Curl_multi *multi)
{
Curl_hash_destroy(&multi->ev.sh_entries);
}