Add per-CPU and per-NUMA node bitmasks to track which io-uring queues are available for new requests. - Global queue availability (avail_q_mask) - Per-NUMA node queue availability (per_numa_avail_q_mask) - Global queue registration (registered_q_mask) - Per-NUMA node queue registration (numa_registered_q_mask) Note that these bitmasks are not lock protected, accessing them will not be absolutely accurate. Goal is to determine which queues are aproximately idle and might be better suited for a request. Signed-off-by: Bernd Schubert <bschubert@xxxxxxx> --- fs/fuse/dev_uring.c | 99 +++++++++++++++++++++++++++++++++++++++++++++++++++ fs/fuse/dev_uring_i.h | 18 ++++++++++ 2 files changed, 117 insertions(+) diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c index 0f5ab27dacb66c9f5f10eac2713d9bd3eb4c26da..c2bc20848bc54541ede9286562177994e7ca5879 100644 --- a/fs/fuse/dev_uring.c +++ b/fs/fuse/dev_uring.c @@ -18,6 +18,8 @@ MODULE_PARM_DESC(enable_uring, #define FUSE_URING_IOV_SEGS 2 /* header and payload */ +/* Number of queued fuse requests until a queue is considered full */ +#define FUSE_URING_QUEUE_THRESHOLD 5 bool fuse_uring_enabled(void) { @@ -184,6 +186,25 @@ bool fuse_uring_request_expired(struct fuse_conn *fc) return false; } +static void fuse_ring_destruct_q_masks(struct fuse_ring *ring) +{ + int node; + + free_cpumask_var(ring->avail_q_mask); + if (ring->per_numa_avail_q_mask) { + for (node = 0; node < ring->nr_numa_nodes; node++) + free_cpumask_var(ring->per_numa_avail_q_mask[node]); + kfree(ring->per_numa_avail_q_mask); + } + + free_cpumask_var(ring->registered_q_mask); + if (ring->numa_registered_q_mask) { + for (node = 0; node < ring->nr_numa_nodes; node++) + free_cpumask_var(ring->numa_registered_q_mask[node]); + kfree(ring->numa_registered_q_mask); + } +} + void fuse_uring_destruct(struct fuse_conn *fc) { struct fuse_ring *ring = fc->ring; @@ -215,11 +236,44 @@ void fuse_uring_destruct(struct fuse_conn *fc) ring->queues[qid] = NULL; } + fuse_ring_destruct_q_masks(ring); kfree(ring->queues); kfree(ring); fc->ring = NULL; } +static int fuse_ring_create_q_masks(struct fuse_ring *ring) +{ + if (!zalloc_cpumask_var(&ring->avail_q_mask, GFP_KERNEL_ACCOUNT)) + return -ENOMEM; + + if (!zalloc_cpumask_var(&ring->registered_q_mask, GFP_KERNEL_ACCOUNT)) + return -ENOMEM; + + ring->per_numa_avail_q_mask = kcalloc(ring->nr_numa_nodes, + sizeof(struct cpumask *), + GFP_KERNEL_ACCOUNT); + if (!ring->per_numa_avail_q_mask) + return -ENOMEM; + for (int node = 0; node < ring->nr_numa_nodes; node++) + if (!zalloc_cpumask_var(&ring->per_numa_avail_q_mask[node], + GFP_KERNEL_ACCOUNT)) + return -ENOMEM; + + ring->numa_registered_q_mask = kcalloc(ring->nr_numa_nodes, + sizeof(struct cpumask *), + GFP_KERNEL_ACCOUNT); + if (!ring->numa_registered_q_mask) + return -ENOMEM; + for (int node = 0; node < ring->nr_numa_nodes; node++) { + if (!zalloc_cpumask_var(&ring->numa_registered_q_mask[node], + GFP_KERNEL_ACCOUNT)) + return -ENOMEM; + } + + return 0; +} + /* * Basic ring setup for this connection based on the provided configuration */ @@ -229,11 +283,14 @@ static struct fuse_ring *fuse_uring_create(struct fuse_conn *fc) size_t nr_queues = num_possible_cpus(); struct fuse_ring *res = NULL; size_t max_payload_size; + int err; ring = kzalloc(sizeof(*fc->ring), GFP_KERNEL_ACCOUNT); if (!ring) return NULL; + ring->nr_numa_nodes = num_online_nodes(); + ring->queues = kcalloc(nr_queues, sizeof(struct fuse_ring_queue *), GFP_KERNEL_ACCOUNT); if (!ring->queues) @@ -242,6 +299,10 @@ static struct fuse_ring *fuse_uring_create(struct fuse_conn *fc) max_payload_size = max(FUSE_MIN_READ_BUFFER, fc->max_write); max_payload_size = max(max_payload_size, fc->max_pages * PAGE_SIZE); + err = fuse_ring_create_q_masks(ring); + if (err) + goto out_err; + spin_lock(&fc->lock); if (fc->ring) { /* race, another thread created the ring in the meantime */ @@ -261,6 +322,7 @@ static struct fuse_ring *fuse_uring_create(struct fuse_conn *fc) return ring; out_err: + fuse_ring_destruct_q_masks(ring); kfree(ring->queues); kfree(ring); return res; @@ -284,6 +346,10 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, queue->qid = qid; queue->ring = ring; + queue->numa_node = cpu_to_node(qid); + if (unlikely(queue->numa_node < 0 || + queue->numa_node >= ring->nr_numa_nodes)) + queue->numa_node = 0; spin_lock_init(&queue->lock); INIT_LIST_HEAD(&queue->ent_avail_queue); @@ -423,6 +489,7 @@ static void fuse_uring_log_ent_state(struct fuse_ring *ring) pr_info(" ent-commit-queue ring=%p qid=%d ent=%p state=%d\n", ring, qid, ent, ent->state); } + spin_unlock(&queue->lock); } ring->stop_debug_log = 1; @@ -472,11 +539,18 @@ void fuse_uring_stop_queues(struct fuse_ring *ring) for (qid = 0; qid < ring->max_nr_queues; qid++) { struct fuse_ring_queue *queue = READ_ONCE(ring->queues[qid]); + int node; if (!queue) continue; fuse_uring_teardown_entries(queue); + + node = queue->numa_node; + cpumask_clear_cpu(qid, ring->registered_q_mask); + cpumask_clear_cpu(qid, ring->avail_q_mask); + cpumask_clear_cpu(qid, ring->numa_registered_q_mask[node]); + cpumask_clear_cpu(qid, ring->per_numa_avail_q_mask[node]); } if (atomic_read(&ring->queue_refs) > 0) { @@ -744,9 +818,18 @@ static int fuse_uring_send_next_to_ring(struct fuse_ring_ent *ent, static void fuse_uring_ent_avail(struct fuse_ring_ent *ent, struct fuse_ring_queue *queue) { + struct fuse_ring *ring = queue->ring; + int node = queue->numa_node; + WARN_ON_ONCE(!ent->cmd); list_move(&ent->list, &queue->ent_avail_queue); ent->state = FRRS_AVAILABLE; + + if (list_is_singular(&queue->ent_avail_queue) && + queue->nr_reqs <= FUSE_URING_QUEUE_THRESHOLD) { + cpumask_set_cpu(queue->qid, ring->avail_q_mask); + cpumask_set_cpu(queue->qid, ring->per_numa_avail_q_mask[node]); + } } /* Used to find the request on SQE commit */ @@ -769,6 +852,8 @@ static void fuse_uring_add_req_to_ring_ent(struct fuse_ring_ent *ent, struct fuse_req *req) { struct fuse_ring_queue *queue = ent->queue; + struct fuse_ring *ring = queue->ring; + int node = queue->numa_node; lockdep_assert_held(&queue->lock); @@ -783,6 +868,16 @@ static void fuse_uring_add_req_to_ring_ent(struct fuse_ring_ent *ent, ent->state = FRRS_FUSE_REQ; list_move_tail(&ent->list, &queue->ent_w_req_queue); fuse_uring_add_to_pq(ent, req); + + /* + * If there are no more available entries, mark the queue as unavailable + * in both global and per-NUMA node masks + */ + if (list_empty(&queue->ent_avail_queue)) { + cpumask_clear_cpu(queue->qid, ring->avail_q_mask); + cpumask_clear_cpu(queue->qid, + ring->per_numa_avail_q_mask[node]); + } } /* Fetch the next fuse request if available */ @@ -982,6 +1077,7 @@ static void fuse_uring_do_register(struct fuse_ring_ent *ent, struct fuse_ring *ring = queue->ring; struct fuse_conn *fc = ring->fc; struct fuse_iqueue *fiq = &fc->iq; + int node = queue->numa_node; fuse_uring_prepare_cancel(cmd, issue_flags, ent); @@ -990,6 +1086,9 @@ static void fuse_uring_do_register(struct fuse_ring_ent *ent, fuse_uring_ent_avail(ent, queue); spin_unlock(&queue->lock); + cpumask_set_cpu(queue->qid, ring->registered_q_mask); + cpumask_set_cpu(queue->qid, ring->numa_registered_q_mask[node]); + if (!ring->ready) { bool ready = is_ring_ready(ring, queue->qid); diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h index 708412294982566919122a1a0d7f741217c763ce..0457dbc6737c8876dd7a7d4c9c724da05e553e6a 100644 --- a/fs/fuse/dev_uring_i.h +++ b/fs/fuse/dev_uring_i.h @@ -66,6 +66,9 @@ struct fuse_ring_queue { /* queue id, corresponds to the cpu core */ unsigned int qid; + /* NUMA node this queue belongs to */ + int numa_node; + /* * queue lock, taken when any value in the queue changes _and_ also * a ring entry state changes. @@ -115,6 +118,9 @@ struct fuse_ring { /* number of ring queues */ size_t max_nr_queues; + /* number of numa nodes */ + int nr_numa_nodes; + /* maximum payload/arg size */ size_t max_payload_sz; @@ -125,6 +131,18 @@ struct fuse_ring { */ unsigned int stop_debug_log : 1; + /* Tracks which queues are available (empty) globally */ + cpumask_var_t avail_q_mask; + + /* Tracks which queues are available per NUMA node */ + cpumask_var_t *per_numa_avail_q_mask; + + /* Tracks which queues are registered */ + cpumask_var_t registered_q_mask; + + /* Tracks which queues are registered per NUMA node */ + cpumask_var_t *numa_registered_q_mask; + wait_queue_head_t stop_waitq; /* async tear down */ -- 2.43.0