Re: [PATCH v3 bpf] xsk: fix immature cq descriptor production

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Wed, 6 Aug 2025 at 17:41, Maciej Fijalkowski
<maciej.fijalkowski@xxxxxxxxx> wrote:
>
> Eryk reported an issue that I have put under Closes: tag, related to
> umem addrs being prematurely produced onto pool's completion queue.
> Let us make the skb's destructor responsible for producing all addrs
> that given skb used.
>
> Introduce struct xsk_addrs which will carry descriptor count with array
> of addresses taken from processed descriptors that will be carried via
> skb_shared_info::destructor_arg. This way we can refer to it within
> xsk_destruct_skb(). In order to mitigate the overhead that will be
> coming from memory allocations, let us introduce kmem_cache of xsk_addrs
> onto xdp_sock. Utilize the existing struct hole in xdp_sock for that.
>
> Commit from fixes tag introduced the buggy behavior, it was not broken
> from day 1, but rather when xsk multi-buffer got introduced.
>
> Fixes: b7f72a30e9ac ("xsk: introduce wrappers and helpers for supporting multi-buffer in Tx path")
> Reported-by: Eryk Kubanski <e.kubanski@xxxxxxxxxxxxxxxxxxx>
> Closes: https://lore.kernel.org/netdev/20250530103456.53564-1-e.kubanski@xxxxxxxxxxxxxxxxxxx/
> Signed-off-by: Maciej Fijalkowski <maciej.fijalkowski@xxxxxxxxx>
> ---
> v1:
> https://lore.kernel.org/bpf/20250702101648.1942562-1-maciej.fijalkowski@xxxxxxxxx/
> v2:
> https://lore.kernel.org/bpf/20250705135512.1963216-1-maciej.fijalkowski@xxxxxxxxx/
>
> v1->v2:
> * store addrs in array carried via destructor_arg instead having them
>   stored in skb headroom; cleaner and less hacky approach;
> v2->v3:
> * use kmem_cache for xsk_addrs allocation (Stan/Olek)
> * set err when xsk_addrs allocation fails (Dan)
> * change xsk_addrs layout to avoid holes
> * free xsk_addrs on error path
> * rebase
> ---
>  include/net/xdp_sock.h |  1 +
>  net/xdp/xsk.c          | 94 ++++++++++++++++++++++++++++++++++--------
>  net/xdp/xsk_queue.h    | 12 ++++++
>  3 files changed, 89 insertions(+), 18 deletions(-)
>
> diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h
> index ce587a225661..5ba9ad4c110f 100644
> --- a/include/net/xdp_sock.h
> +++ b/include/net/xdp_sock.h
> @@ -61,6 +61,7 @@ struct xdp_sock {
>                 XSK_BOUND,
>                 XSK_UNBOUND,
>         } state;
> +       struct kmem_cache *xsk_addrs_cache;
>
>         struct xsk_queue *tx ____cacheline_aligned_in_smp;
>         struct list_head tx_list;
> diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c
> index 9c3acecc14b1..d77cde0131be 100644
> --- a/net/xdp/xsk.c
> +++ b/net/xdp/xsk.c
> @@ -36,6 +36,11 @@
>  #define TX_BATCH_SIZE 32
>  #define MAX_PER_SOCKET_BUDGET 32
>
> +struct xsk_addrs {
> +       u64 addrs[MAX_SKB_FRAGS + 1];
> +       u32 num_descs;
> +};

As you will have to produce a v4, I suggest that you put num_descs
first in this struct. The current allocation will lead to 2 cache line
accesses for the case when we have only one fragment. By setting it
first, it will reduced to one cache line access. Yes, we will create a
hole, but I think wasting 4 bytes here is worth it. What do you think?

Apart from that, looks good.

Acked-by: Magnus Karlsson <magnus.karlsson@xxxxxxxxx>

> +
>  void xsk_set_rx_need_wakeup(struct xsk_buff_pool *pool)
>  {
>         if (pool->cached_need_wakeup & XDP_WAKEUP_RX)
> @@ -532,25 +537,39 @@ static int xsk_wakeup(struct xdp_sock *xs, u8 flags)
>         return dev->netdev_ops->ndo_xsk_wakeup(dev, xs->queue_id, flags);
>  }
>
> -static int xsk_cq_reserve_addr_locked(struct xsk_buff_pool *pool, u64 addr)
> +static int xsk_cq_reserve_locked(struct xsk_buff_pool *pool)
>  {
>         unsigned long flags;
>         int ret;
>
>         spin_lock_irqsave(&pool->cq_lock, flags);
> -       ret = xskq_prod_reserve_addr(pool->cq, addr);
> +       ret = xskq_prod_reserve(pool->cq);
>         spin_unlock_irqrestore(&pool->cq_lock, flags);
>
>         return ret;
>  }
>
> -static void xsk_cq_submit_locked(struct xsk_buff_pool *pool, u32 n)
> +static void xsk_cq_submit_addr_locked(struct xdp_sock *xs,
> +                                     struct sk_buff *skb)
>  {
> +       struct xsk_buff_pool *pool = xs->pool;
> +       struct xsk_addrs *xsk_addrs;
>         unsigned long flags;
> +       u32 num_desc, i;
> +       u32 idx;
> +
> +       xsk_addrs = (struct xsk_addrs *)skb_shinfo(skb)->destructor_arg;
> +       num_desc = xsk_addrs->num_descs;
>
>         spin_lock_irqsave(&pool->cq_lock, flags);
> -       xskq_prod_submit_n(pool->cq, n);
> +       idx = xskq_get_prod(pool->cq);
> +
> +       for (i = 0; i < num_desc; i++, idx++)
> +               xskq_prod_write_addr(pool->cq, idx, xsk_addrs->addrs[i]);
> +       xskq_prod_submit_n(pool->cq, num_desc);
> +
>         spin_unlock_irqrestore(&pool->cq_lock, flags);
> +       kmem_cache_free(xs->xsk_addrs_cache, xsk_addrs);
>  }
>
>  static void xsk_cq_cancel_locked(struct xsk_buff_pool *pool, u32 n)
> @@ -562,35 +581,45 @@ static void xsk_cq_cancel_locked(struct xsk_buff_pool *pool, u32 n)
>         spin_unlock_irqrestore(&pool->cq_lock, flags);
>  }
>
> -static u32 xsk_get_num_desc(struct sk_buff *skb)
> -{
> -       return skb ? (long)skb_shinfo(skb)->destructor_arg : 0;
> -}
> -
>  static void xsk_destruct_skb(struct sk_buff *skb)
>  {
>         struct xsk_tx_metadata_compl *compl = &skb_shinfo(skb)->xsk_meta;
>
> -       if (compl->tx_timestamp) {
> +       if (compl->tx_timestamp)
>                 /* sw completion timestamp, not a real one */
>                 *compl->tx_timestamp = ktime_get_tai_fast_ns();
> -       }
>
> -       xsk_cq_submit_locked(xdp_sk(skb->sk)->pool, xsk_get_num_desc(skb));
> +       xsk_cq_submit_addr_locked(xdp_sk(skb->sk), skb);
>         sock_wfree(skb);
>  }
>
> -static void xsk_set_destructor_arg(struct sk_buff *skb)
> +static u32 xsk_get_num_desc(struct sk_buff *skb)
> +{
> +       struct xsk_addrs *addrs;
> +
> +       addrs = (struct xsk_addrs *)skb_shinfo(skb)->destructor_arg;
> +       return addrs->num_descs;
> +}
> +
> +static void xsk_set_destructor_arg(struct sk_buff *skb, struct xsk_addrs *addrs)
>  {
> -       long num = xsk_get_num_desc(xdp_sk(skb->sk)->skb) + 1;
> +       skb_shinfo(skb)->destructor_arg = (void *)addrs;
> +}
> +
> +static void xsk_inc_skb_descs(struct sk_buff *skb)
> +{
> +       struct xsk_addrs *addrs;
>
> -       skb_shinfo(skb)->destructor_arg = (void *)num;
> +       addrs = (struct xsk_addrs *)skb_shinfo(skb)->destructor_arg;
> +       addrs->num_descs++;
>  }
>
>  static void xsk_consume_skb(struct sk_buff *skb)
>  {
>         struct xdp_sock *xs = xdp_sk(skb->sk);
>
> +       kmem_cache_free(xs->xsk_addrs_cache,
> +                       (struct xsk_addrs *)skb_shinfo(skb)->destructor_arg);
>         skb->destructor = sock_wfree;
>         xsk_cq_cancel_locked(xs->pool, xsk_get_num_desc(skb));
>         /* Free skb without triggering the perf drop trace */
> @@ -609,6 +638,7 @@ static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
>  {
>         struct xsk_buff_pool *pool = xs->pool;
>         u32 hr, len, ts, offset, copy, copied;
> +       struct xsk_addrs *addrs = NULL;
>         struct sk_buff *skb = xs->skb;
>         struct page *page;
>         void *buffer;
> @@ -623,6 +653,12 @@ static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
>                         return ERR_PTR(err);
>
>                 skb_reserve(skb, hr);
> +
> +               addrs = kmem_cache_zalloc(xs->xsk_addrs_cache, GFP_KERNEL);
> +               if (!addrs)
> +                       return ERR_PTR(-ENOMEM);
> +
> +               xsk_set_destructor_arg(skb, addrs);
>         }
>
>         addr = desc->addr;
> @@ -662,6 +698,7 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
>  {
>         struct xsk_tx_metadata *meta = NULL;
>         struct net_device *dev = xs->dev;
> +       struct xsk_addrs *addrs = NULL;
>         struct sk_buff *skb = xs->skb;
>         bool first_frag = false;
>         int err;
> @@ -694,6 +731,15 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
>                         err = skb_store_bits(skb, 0, buffer, len);
>                         if (unlikely(err))
>                                 goto free_err;
> +
> +                       addrs = kmem_cache_zalloc(xs->xsk_addrs_cache, GFP_KERNEL);
> +                       if (!addrs) {
> +                               err = -ENOMEM;
> +                               goto free_err;
> +                       }
> +
> +                       xsk_set_destructor_arg(skb, addrs);
> +
>                 } else {
>                         int nr_frags = skb_shinfo(skb)->nr_frags;
>                         struct page *page;
> @@ -759,7 +805,9 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
>         skb->mark = READ_ONCE(xs->sk.sk_mark);
>         skb->destructor = xsk_destruct_skb;
>         xsk_tx_metadata_to_compl(meta, &skb_shinfo(skb)->xsk_meta);
> -       xsk_set_destructor_arg(skb);
> +
> +       addrs = (struct xsk_addrs *)skb_shinfo(skb)->destructor_arg;
> +       addrs->addrs[addrs->num_descs++] = desc->addr;
>
>         return skb;
>
> @@ -769,7 +817,7 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
>
>         if (err == -EOVERFLOW) {
>                 /* Drop the packet */
> -               xsk_set_destructor_arg(xs->skb);
> +               xsk_inc_skb_descs(xs->skb);
>                 xsk_drop_skb(xs->skb);
>                 xskq_cons_release(xs->tx);
>         } else {
> @@ -812,7 +860,7 @@ static int __xsk_generic_xmit(struct sock *sk)
>                  * if there is space in it. This avoids having to implement
>                  * any buffering in the Tx path.
>                  */
> -               err = xsk_cq_reserve_addr_locked(xs->pool, desc.addr);
> +               err = xsk_cq_reserve_locked(xs->pool);
>                 if (err) {
>                         err = -EAGAIN;
>                         goto out;
> @@ -1122,6 +1170,7 @@ static int xsk_release(struct socket *sock)
>         xskq_destroy(xs->tx);
>         xskq_destroy(xs->fq_tmp);
>         xskq_destroy(xs->cq_tmp);
> +       kmem_cache_destroy(xs->xsk_addrs_cache);
>
>         sock_orphan(sk);
>         sock->sk = NULL;
> @@ -1765,6 +1814,15 @@ static int xsk_create(struct net *net, struct socket *sock, int protocol,
>
>         sock_prot_inuse_add(net, &xsk_proto, 1);
>
> +       xs->xsk_addrs_cache = kmem_cache_create("xsk_generic_xmit_cache",
> +                                               sizeof(struct xsk_addrs), 0,
> +                                               SLAB_HWCACHE_ALIGN, NULL);
> +
> +       if (!xs->xsk_addrs_cache) {
> +               sk_free(sk);
> +               return -ENOMEM;
> +       }
> +
>         return 0;
>  }
>
> diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
> index 46d87e961ad6..f16f390370dc 100644
> --- a/net/xdp/xsk_queue.h
> +++ b/net/xdp/xsk_queue.h
> @@ -344,6 +344,11 @@ static inline u32 xskq_cons_present_entries(struct xsk_queue *q)
>
>  /* Functions for producers */
>
> +static inline u32 xskq_get_prod(struct xsk_queue *q)
> +{
> +       return READ_ONCE(q->ring->producer);
> +}
> +
>  static inline u32 xskq_prod_nb_free(struct xsk_queue *q, u32 max)
>  {
>         u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
> @@ -390,6 +395,13 @@ static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
>         return 0;
>  }
>
> +static inline void xskq_prod_write_addr(struct xsk_queue *q, u32 idx, u64 addr)
> +{
> +       struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
> +
> +       ring->desc[idx & q->ring_mask] = addr;
> +}
> +
>  static inline void xskq_prod_write_addr_batch(struct xsk_queue *q, struct xdp_desc *descs,
>                                               u32 nb_entries)
>  {
> --
> 2.34.1
>
>




[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux