[PATCH 4/5] fuse: {io-uring} Distribute load among queues

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



So far queue selection was only for the queue corresponding
to the current core.
A previous commit introduced bitmaps that track which queues
are available - queue selection can make use of these bitmaps
and try to find another queue if the current one is loaded.

Signed-off-by: Bernd Schubert <bschubert@xxxxxxx>
---
 fs/fuse/dev_uring.c | 98 +++++++++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 88 insertions(+), 10 deletions(-)

diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c
index c2bc20848bc54541ede9286562177994e7ca5879..624f856388e0867f3c3caed6771e61babd076645 100644
--- a/fs/fuse/dev_uring.c
+++ b/fs/fuse/dev_uring.c
@@ -825,8 +825,7 @@ static void fuse_uring_ent_avail(struct fuse_ring_ent *ent,
 	list_move(&ent->list, &queue->ent_avail_queue);
 	ent->state = FRRS_AVAILABLE;
 
-	if (list_is_singular(&queue->ent_avail_queue) &&
-	    queue->nr_reqs <= FUSE_URING_QUEUE_THRESHOLD) {
+	if (queue->nr_reqs <= FUSE_URING_QUEUE_THRESHOLD) {
 		cpumask_set_cpu(queue->qid, ring->avail_q_mask);
 		cpumask_set_cpu(queue->qid, ring->per_numa_avail_q_mask[node]);
 	}
@@ -1066,6 +1065,23 @@ static bool is_ring_ready(struct fuse_ring *ring, int current_qid)
 	return ready;
 }
 
+static int fuse_uring_map_qid(int qid, const struct cpumask *mask)
+{
+	int nr_queues = cpumask_weight(mask);
+	int nth, cpu;
+
+	if (nr_queues == 0)
+		return -1;
+
+	nth = qid % nr_queues;
+	for_each_cpu(cpu, mask) {
+		if (nth-- == 0)
+			return cpu;
+	}
+
+	return -1;
+}
+
 /*
  * fuse_uring_req_fetch command handling
  */
@@ -1328,22 +1344,57 @@ static void fuse_uring_send_in_task(struct io_uring_cmd *cmd,
 	fuse_uring_send(ent, cmd, err, issue_flags);
 }
 
-static struct fuse_ring_queue *fuse_uring_task_to_queue(struct fuse_ring *ring)
+static struct fuse_ring_queue *
+fuse_uring_get_first_queue(struct fuse_ring *ring, const struct cpumask *mask)
+{
+	int qid;
+
+	/* Find the first available CPU in this mask */
+	qid = cpumask_first(mask);
+
+	/* Check if we found a valid CPU */
+	if (qid >= ring->max_nr_queues)
+		return NULL; /* No available queues */
+
+	/* This is the global mask, cpu is already the global qid */
+	return ring->queues[qid];
+}
+
+/*
+ * Get the best queue for the current CPU
+ */
+static struct fuse_ring_queue *fuse_uring_get_queue(struct fuse_ring *ring)
 {
 	unsigned int qid;
-	struct fuse_ring_queue *queue;
+	struct fuse_ring_queue *queue, *local_queue;
+	int local_node;
+	struct cpumask *mask;
 
 	qid = task_cpu(current);
-
 	if (WARN_ONCE(qid >= ring->max_nr_queues,
 		      "Core number (%u) exceeds nr queues (%zu)\n", qid,
 		      ring->max_nr_queues))
 		qid = 0;
+	local_node = cpu_to_node(qid);
 
-	queue = ring->queues[qid];
-	WARN_ONCE(!queue, "Missing queue for qid %d\n", qid);
+	local_queue = queue = ring->queues[qid];
+	if (WARN_ONCE(!queue, "Missing queue for qid %d\n", qid))
+		return NULL;
 
-	return queue;
+	if (queue->nr_reqs <= FUSE_URING_QUEUE_THRESHOLD)
+		return queue;
+
+	mask = ring->per_numa_avail_q_mask[local_node];
+	queue = fuse_uring_get_first_queue(ring, mask);
+	if (queue)
+		return queue;
+
+	/* Third check if there are any available queues on any node */
+	queue = fuse_uring_get_first_queue(ring, ring->avail_q_mask);
+	if (queue)
+		return queue;
+
+	return local_queue;
 }
 
 static void fuse_uring_dispatch_ent(struct fuse_ring_ent *ent)
@@ -1364,7 +1415,7 @@ void fuse_uring_queue_fuse_req(struct fuse_iqueue *fiq, struct fuse_req *req)
 	int err;
 
 	err = -EINVAL;
-	queue = fuse_uring_task_to_queue(ring);
+	queue = fuse_uring_get_queue(ring);
 	if (!queue)
 		goto err;
 
@@ -1382,6 +1433,19 @@ void fuse_uring_queue_fuse_req(struct fuse_iqueue *fiq, struct fuse_req *req)
 				       struct fuse_ring_ent, list);
 	queue->nr_reqs++;
 
+	/*
+	 * Update queue availability based on number of requests
+	 * A queue is considered busy if it has more than
+	 * FUSE_URING_QUEUE_THRESHOLD requests
+	 */
+	if (queue->nr_reqs == FUSE_URING_QUEUE_THRESHOLD + 1) {
+		/* Queue just became busy */
+		cpumask_clear_cpu(queue->qid, ring->avail_q_mask);
+		cpumask_clear_cpu(
+			queue->qid,
+			ring->per_numa_avail_q_mask[queue->numa_node]);
+	}
+
 	if (ent)
 		fuse_uring_add_req_to_ring_ent(ent, req);
 	else
@@ -1409,7 +1473,7 @@ bool fuse_uring_queue_bq_req(struct fuse_req *req)
 	struct fuse_ring_queue *queue;
 	struct fuse_ring_ent *ent = NULL;
 
-	queue = fuse_uring_task_to_queue(ring);
+	queue = fuse_uring_get_queue(ring);
 	if (!queue)
 		return false;
 
@@ -1455,12 +1519,26 @@ bool fuse_uring_queue_bq_req(struct fuse_req *req)
 bool fuse_uring_remove_pending_req(struct fuse_req *req)
 {
 	struct fuse_ring_queue *queue = req->ring_queue;
+	struct fuse_ring *ring = queue->ring;
+	int node = queue->numa_node;
 	bool removed = fuse_remove_pending_req(req, &queue->lock);
 
 	if (removed) {
 		/* Update counters after successful removal */
 		spin_lock(&queue->lock);
 		queue->nr_reqs--;
+
+		/*
+		 * Update queue availability based on number of requests
+		 * A queue is considered available if it has
+		 * FUSE_URING_QUEUE_THRESHOLD or fewer requests
+		 */
+		if (queue->nr_reqs == FUSE_URING_QUEUE_THRESHOLD) {
+			/* Queue just became available */
+			cpumask_set_cpu(queue->qid, ring->avail_q_mask);
+			cpumask_set_cpu(queue->qid,
+					ring->per_numa_avail_q_mask[node]);
+		}
 		spin_unlock(&queue->lock);
 	}
 

-- 
2.43.0





[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [NTFS 3]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [NTFS 3]     [Samba]     [Device Mapper]     [CEPH Development]

  Powered by Linux