[PATCH 3/5] fuse: {io-uring} Use bitmaps to track queue availability

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Add per-CPU and per-NUMA node bitmasks to track which
io-uring queues are available for new requests.

- Global queue availability (avail_q_mask)
- Per-NUMA node queue availability (per_numa_avail_q_mask)
- Global queue registration (registered_q_mask)
- Per-NUMA node queue registration (numa_registered_q_mask)

Note that these bitmasks are not lock protected, accessing them
will not be absolutely accurate. Goal is to determine which
queues are aproximately idle and might be better suited for
a request.

Signed-off-by: Bernd Schubert <bschubert@xxxxxxx>
---
 fs/fuse/dev_uring.c   | 99 +++++++++++++++++++++++++++++++++++++++++++++++++++
 fs/fuse/dev_uring_i.h | 18 ++++++++++
 2 files changed, 117 insertions(+)

diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c
index 0f5ab27dacb66c9f5f10eac2713d9bd3eb4c26da..c2bc20848bc54541ede9286562177994e7ca5879 100644
--- a/fs/fuse/dev_uring.c
+++ b/fs/fuse/dev_uring.c
@@ -18,6 +18,8 @@ MODULE_PARM_DESC(enable_uring,
 
 #define FUSE_URING_IOV_SEGS 2 /* header and payload */
 
+/* Number of queued fuse requests until a queue is considered full */
+#define FUSE_URING_QUEUE_THRESHOLD 5
 
 bool fuse_uring_enabled(void)
 {
@@ -184,6 +186,25 @@ bool fuse_uring_request_expired(struct fuse_conn *fc)
 	return false;
 }
 
+static void fuse_ring_destruct_q_masks(struct fuse_ring *ring)
+{
+	int node;
+
+	free_cpumask_var(ring->avail_q_mask);
+	if (ring->per_numa_avail_q_mask) {
+		for (node = 0; node < ring->nr_numa_nodes; node++)
+			free_cpumask_var(ring->per_numa_avail_q_mask[node]);
+		kfree(ring->per_numa_avail_q_mask);
+	}
+
+	free_cpumask_var(ring->registered_q_mask);
+	if (ring->numa_registered_q_mask) {
+		for (node = 0; node < ring->nr_numa_nodes; node++)
+			free_cpumask_var(ring->numa_registered_q_mask[node]);
+		kfree(ring->numa_registered_q_mask);
+	}
+}
+
 void fuse_uring_destruct(struct fuse_conn *fc)
 {
 	struct fuse_ring *ring = fc->ring;
@@ -215,11 +236,44 @@ void fuse_uring_destruct(struct fuse_conn *fc)
 		ring->queues[qid] = NULL;
 	}
 
+	fuse_ring_destruct_q_masks(ring);
 	kfree(ring->queues);
 	kfree(ring);
 	fc->ring = NULL;
 }
 
+static int fuse_ring_create_q_masks(struct fuse_ring *ring)
+{
+	if (!zalloc_cpumask_var(&ring->avail_q_mask, GFP_KERNEL_ACCOUNT))
+		return -ENOMEM;
+
+	if (!zalloc_cpumask_var(&ring->registered_q_mask, GFP_KERNEL_ACCOUNT))
+		return -ENOMEM;
+
+	ring->per_numa_avail_q_mask = kcalloc(ring->nr_numa_nodes,
+					      sizeof(struct cpumask *),
+					      GFP_KERNEL_ACCOUNT);
+	if (!ring->per_numa_avail_q_mask)
+		return -ENOMEM;
+	for (int node = 0; node < ring->nr_numa_nodes; node++)
+		if (!zalloc_cpumask_var(&ring->per_numa_avail_q_mask[node],
+					GFP_KERNEL_ACCOUNT))
+			return -ENOMEM;
+
+	ring->numa_registered_q_mask = kcalloc(ring->nr_numa_nodes,
+					       sizeof(struct cpumask *),
+					       GFP_KERNEL_ACCOUNT);
+	if (!ring->numa_registered_q_mask)
+		return -ENOMEM;
+	for (int node = 0; node < ring->nr_numa_nodes; node++) {
+		if (!zalloc_cpumask_var(&ring->numa_registered_q_mask[node],
+					GFP_KERNEL_ACCOUNT))
+			return -ENOMEM;
+	}
+
+	return 0;
+}
+
 /*
  * Basic ring setup for this connection based on the provided configuration
  */
@@ -229,11 +283,14 @@ static struct fuse_ring *fuse_uring_create(struct fuse_conn *fc)
 	size_t nr_queues = num_possible_cpus();
 	struct fuse_ring *res = NULL;
 	size_t max_payload_size;
+	int err;
 
 	ring = kzalloc(sizeof(*fc->ring), GFP_KERNEL_ACCOUNT);
 	if (!ring)
 		return NULL;
 
+	ring->nr_numa_nodes = num_online_nodes();
+
 	ring->queues = kcalloc(nr_queues, sizeof(struct fuse_ring_queue *),
 			       GFP_KERNEL_ACCOUNT);
 	if (!ring->queues)
@@ -242,6 +299,10 @@ static struct fuse_ring *fuse_uring_create(struct fuse_conn *fc)
 	max_payload_size = max(FUSE_MIN_READ_BUFFER, fc->max_write);
 	max_payload_size = max(max_payload_size, fc->max_pages * PAGE_SIZE);
 
+	err = fuse_ring_create_q_masks(ring);
+	if (err)
+		goto out_err;
+
 	spin_lock(&fc->lock);
 	if (fc->ring) {
 		/* race, another thread created the ring in the meantime */
@@ -261,6 +322,7 @@ static struct fuse_ring *fuse_uring_create(struct fuse_conn *fc)
 	return ring;
 
 out_err:
+	fuse_ring_destruct_q_masks(ring);
 	kfree(ring->queues);
 	kfree(ring);
 	return res;
@@ -284,6 +346,10 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring,
 
 	queue->qid = qid;
 	queue->ring = ring;
+	queue->numa_node = cpu_to_node(qid);
+	if (unlikely(queue->numa_node < 0 ||
+		     queue->numa_node >= ring->nr_numa_nodes))
+		queue->numa_node = 0;
 	spin_lock_init(&queue->lock);
 
 	INIT_LIST_HEAD(&queue->ent_avail_queue);
@@ -423,6 +489,7 @@ static void fuse_uring_log_ent_state(struct fuse_ring *ring)
 			pr_info(" ent-commit-queue ring=%p qid=%d ent=%p state=%d\n",
 				ring, qid, ent, ent->state);
 		}
+
 		spin_unlock(&queue->lock);
 	}
 	ring->stop_debug_log = 1;
@@ -472,11 +539,18 @@ void fuse_uring_stop_queues(struct fuse_ring *ring)
 
 	for (qid = 0; qid < ring->max_nr_queues; qid++) {
 		struct fuse_ring_queue *queue = READ_ONCE(ring->queues[qid]);
+		int node;
 
 		if (!queue)
 			continue;
 
 		fuse_uring_teardown_entries(queue);
+
+		node = queue->numa_node;
+		cpumask_clear_cpu(qid, ring->registered_q_mask);
+		cpumask_clear_cpu(qid, ring->avail_q_mask);
+		cpumask_clear_cpu(qid, ring->numa_registered_q_mask[node]);
+		cpumask_clear_cpu(qid, ring->per_numa_avail_q_mask[node]);
 	}
 
 	if (atomic_read(&ring->queue_refs) > 0) {
@@ -744,9 +818,18 @@ static int fuse_uring_send_next_to_ring(struct fuse_ring_ent *ent,
 static void fuse_uring_ent_avail(struct fuse_ring_ent *ent,
 				 struct fuse_ring_queue *queue)
 {
+	struct fuse_ring *ring = queue->ring;
+	int node = queue->numa_node;
+
 	WARN_ON_ONCE(!ent->cmd);
 	list_move(&ent->list, &queue->ent_avail_queue);
 	ent->state = FRRS_AVAILABLE;
+
+	if (list_is_singular(&queue->ent_avail_queue) &&
+	    queue->nr_reqs <= FUSE_URING_QUEUE_THRESHOLD) {
+		cpumask_set_cpu(queue->qid, ring->avail_q_mask);
+		cpumask_set_cpu(queue->qid, ring->per_numa_avail_q_mask[node]);
+	}
 }
 
 /* Used to find the request on SQE commit */
@@ -769,6 +852,8 @@ static void fuse_uring_add_req_to_ring_ent(struct fuse_ring_ent *ent,
 					   struct fuse_req *req)
 {
 	struct fuse_ring_queue *queue = ent->queue;
+	struct fuse_ring *ring = queue->ring;
+	int node = queue->numa_node;
 
 	lockdep_assert_held(&queue->lock);
 
@@ -783,6 +868,16 @@ static void fuse_uring_add_req_to_ring_ent(struct fuse_ring_ent *ent,
 	ent->state = FRRS_FUSE_REQ;
 	list_move_tail(&ent->list, &queue->ent_w_req_queue);
 	fuse_uring_add_to_pq(ent, req);
+
+	/*
+	 * If there are no more available entries, mark the queue as unavailable
+	 * in both global and per-NUMA node masks
+	 */
+	if (list_empty(&queue->ent_avail_queue)) {
+		cpumask_clear_cpu(queue->qid, ring->avail_q_mask);
+		cpumask_clear_cpu(queue->qid,
+				  ring->per_numa_avail_q_mask[node]);
+	}
 }
 
 /* Fetch the next fuse request if available */
@@ -982,6 +1077,7 @@ static void fuse_uring_do_register(struct fuse_ring_ent *ent,
 	struct fuse_ring *ring = queue->ring;
 	struct fuse_conn *fc = ring->fc;
 	struct fuse_iqueue *fiq = &fc->iq;
+	int node = queue->numa_node;
 
 	fuse_uring_prepare_cancel(cmd, issue_flags, ent);
 
@@ -990,6 +1086,9 @@ static void fuse_uring_do_register(struct fuse_ring_ent *ent,
 	fuse_uring_ent_avail(ent, queue);
 	spin_unlock(&queue->lock);
 
+	cpumask_set_cpu(queue->qid, ring->registered_q_mask);
+	cpumask_set_cpu(queue->qid, ring->numa_registered_q_mask[node]);
+
 	if (!ring->ready) {
 		bool ready = is_ring_ready(ring, queue->qid);
 
diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h
index 708412294982566919122a1a0d7f741217c763ce..0457dbc6737c8876dd7a7d4c9c724da05e553e6a 100644
--- a/fs/fuse/dev_uring_i.h
+++ b/fs/fuse/dev_uring_i.h
@@ -66,6 +66,9 @@ struct fuse_ring_queue {
 	/* queue id, corresponds to the cpu core */
 	unsigned int qid;
 
+	/* NUMA node this queue belongs to */
+	int numa_node;
+
 	/*
 	 * queue lock, taken when any value in the queue changes _and_ also
 	 * a ring entry state changes.
@@ -115,6 +118,9 @@ struct fuse_ring {
 	/* number of ring queues */
 	size_t max_nr_queues;
 
+	/* number of numa nodes */
+	int nr_numa_nodes;
+
 	/* maximum payload/arg size */
 	size_t max_payload_sz;
 
@@ -125,6 +131,18 @@ struct fuse_ring {
 	 */
 	unsigned int stop_debug_log : 1;
 
+	/* Tracks which queues are available (empty) globally */
+	cpumask_var_t avail_q_mask;
+
+	/* Tracks which queues are available per NUMA node */
+	cpumask_var_t *per_numa_avail_q_mask;
+
+	/* Tracks which queues are registered */
+	cpumask_var_t registered_q_mask;
+
+	/* Tracks which queues are registered per NUMA node */
+	cpumask_var_t *numa_registered_q_mask;
+
 	wait_queue_head_t stop_waitq;
 
 	/* async tear down */

-- 
2.43.0





[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [NTFS 3]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [NTFS 3]     [Samba]     [Device Mapper]     [CEPH Development]

  Powered by Linux