[PATCH 4/4] rdma: Limit tx_descs_posted to prevent ib_post_send() failure
zhengbing.huang
zhengbing.huang at easystack.cn
Sat Dec 6 09:12:27 CET 2025
RDMA transmit path may encounter ib_post_send() failures with -ENOMEM
if the number of posted tx_descs exceeds tx_descs_max. This can happen
when multiple tx_descs are submitted concurrently without proper tracking
of how many are currently outstanding.
Error log:
kernel: infiniband mlx5_1: mlx5_ib_post_send:1101:(pid 43146):
kernel: drbd drbd1 rdma: ib_post_send() failed -12
To avoid this, we introduce a helper function:
atomic_inc_unless_above_limit()
This function ensures that the tx_descs_posted counter is incremented
only if it is still below tx_descs_max. It is now used in all relevant
paths:
- flow control messages
- tx_desc resend paths (repost)
- normal data/control tx_desc sending
On post failure, the patch ensures tx_descs_posted and peer_rx_descs
are properly rolled back to maintain consistency.
This prevents over-posting and ensures tx_desc submission always
respects the max post queue depth supported by the HCA.
Signed-off-by: zhengbing.huang <zhengbing.huang at easystack.cn>
---
drbd/drbd_transport_rdma.c | 109 ++++++++++++++++++++++++++-----------
1 file changed, 77 insertions(+), 32 deletions(-)
diff --git a/drbd/drbd_transport_rdma.c b/drbd/drbd_transport_rdma.c
index ce7be2549..0df4fdc3f 100644
--- a/drbd/drbd_transport_rdma.c
+++ b/drbd/drbd_transport_rdma.c
@@ -95,7 +95,7 @@ int allocation_size;
struct dtr_flow_control {
uint32_t magic;
uint32_t new_rx_descs[2];
- uint32_t rx_desc_stolen_from_stream;
+ uint32_t send_from_stream;
} __packed;
/* These numbers are sent within the immediate data value to identify
@@ -538,6 +538,21 @@ static void dtr_control_timer_fn(struct timer_list *t)
drbd_control_event(transport, TIMEOUT);
}
+static bool atomic_inc_if_bwlow(atomic_t *v, int limit)
+{
+ int old, new;
+
+ do {
+ old = atomic_read(v);
+ if (old >= limit)
+ return false;
+
+ new = old + 1;
+ } while (atomic_cmpxchg(v, old, new) != old);
+
+ return true;
+}
+
static int dtr_send(struct dtr_path *path, void *buf, size_t size, gfp_t gfp_mask)
{
struct ib_device *device;
@@ -1454,8 +1469,9 @@ static bool dtr_receive_rx_desc(struct dtr_transport *rdma_transport,
static int dtr_send_flow_control_msg(struct dtr_path *path, gfp_t gfp_mask)
{
struct dtr_flow_control msg;
+ struct dtr_flow *flow;
enum drbd_stream i;
- int err, n[2], rx_desc_stolen_from = -1, rx_descs = 0;
+ int err, n[2], send_from_stream = -1, rx_descs = 0;
msg.magic = cpu_to_be32(DTR_MAGIC);
@@ -1466,19 +1482,21 @@ static int dtr_send_flow_control_msg(struct dtr_path *path, gfp_t gfp_mask)
from rx_descs_known_to_peer has to be atomic!
*/
for (i = DATA_STREAM; i <= CONTROL_STREAM; i++) {
- struct dtr_flow *flow = &path->flow[i];
+ flow = &path->flow[i];
n[i] = dtr_new_rx_descs(flow);
atomic_add(n[i], &flow->rx_descs_known_to_peer);
rx_descs += n[i];
msg.new_rx_descs[i] = cpu_to_be32(n[i]);
- if (rx_desc_stolen_from == -1 && atomic_dec_if_positive(&flow->peer_rx_descs) >= 0)
- rx_desc_stolen_from = i;
+ if (send_from_stream == -1 &&
+ atomic_read(&flow->tx_descs_posted) < flow->tx_descs_max &&
+ atomic_dec_if_positive(&flow->peer_rx_descs) >= 0)
+ send_from_stream = i;
}
spin_unlock_bh(&path->send_flow_control_lock);
- if (rx_desc_stolen_from == -1) {
+ if (send_from_stream == -1) {
struct drbd_transport *transport = path->path.transport;
struct dtr_transport *rdma_transport =
container_of(transport, struct dtr_transport, transport);
@@ -1489,18 +1507,20 @@ static int dtr_send_flow_control_msg(struct dtr_path *path, gfp_t gfp_mask)
goto out_undo;
}
- if (rx_descs == 0) {
- atomic_inc(&path->flow[rx_desc_stolen_from].peer_rx_descs);
+ flow = &path->flow[send_from_stream];
+ if (rx_descs == 0 || !atomic_inc_if_bwlow(&flow->tx_descs_posted, flow->tx_descs_max)) {
+ atomic_inc(&flow->peer_rx_descs);
return 0;
}
- msg.rx_desc_stolen_from_stream = cpu_to_be32(rx_desc_stolen_from);
+ msg.send_from_stream = cpu_to_be32(send_from_stream);
err = dtr_send(path, &msg, sizeof(msg), gfp_mask);
if (err) {
- atomic_inc(&path->flow[rx_desc_stolen_from].peer_rx_descs);
- out_undo:
+ atomic_inc(&flow->peer_rx_descs);
+ atomic_dec(&flow->tx_descs_posted);
+out_undo:
for (i = DATA_STREAM; i <= CONTROL_STREAM; i++) {
- struct dtr_flow *flow = &path->flow[i];
+ flow = &path->flow[i];
atomic_sub(n[i], &flow->rx_descs_known_to_peer);
}
}
@@ -1540,7 +1560,7 @@ static int dtr_got_flow_control_msg(struct dtr_path *path,
clear_bit(NET_CONGESTED, &rdma_transport->transport.flags);
}
- return be32_to_cpu(msg->rx_desc_stolen_from_stream);
+ return be32_to_cpu(msg->send_from_stream);
}
static void dtr_flow_control_tasklet_fn(struct tasklet_struct *t)
@@ -1550,12 +1570,12 @@ static void dtr_flow_control_tasklet_fn(struct tasklet_struct *t)
dtr_send_flow_control_msg(path, GFP_ATOMIC);
}
-static void dtr_maybe_trigger_flow_control_msg(struct dtr_path *path, int rx_desc_stolen_from)
+static void dtr_maybe_trigger_flow_control_msg(struct dtr_path *path, int send_from_stream)
{
struct dtr_flow *flow;
int n;
- flow = &path->flow[rx_desc_stolen_from];
+ flow = &path->flow[send_from_stream];
n = atomic_dec_return(&flow->rx_descs_known_to_peer);
/* If we get a lot of flow control messages in, but no data on this
* path, we need to tell the peer that we recycled all these buffers
@@ -1802,15 +1822,15 @@ static int dtr_handle_rx_cq_event(struct ib_cq *cq, struct dtr_cm *cm)
rx_desc->size = wc.byte_len;
immediate.i = be32_to_cpu(wc.ex.imm_data);
if (immediate.stream == ST_FLOW_CTRL) {
- int rx_desc_stolen_from;
+ int send_from_stream;
ib_dma_sync_single_for_cpu(cm->id->device, rx_desc->sge.addr,
rdma_transport->rx_allocation_size, DMA_FROM_DEVICE);
- rx_desc_stolen_from = dtr_got_flow_control_msg(path, page_address(rx_desc->page));
+ send_from_stream = dtr_got_flow_control_msg(path, page_address(rx_desc->page));
err = dtr_repost_rx_desc(cm, rx_desc);
if (err)
tr_err(&rdma_transport->transport, "dtr_repost_rx_desc() failed %d", err);
- dtr_maybe_trigger_flow_control_msg(path, rx_desc_stolen_from);
+ dtr_maybe_trigger_flow_control_msg(path, send_from_stream);
} else {
struct dtr_flow *flow = &path->flow[immediate.stream];
struct dtr_stream *rdma_stream = &rdma_transport->stream[immediate.stream];
@@ -1905,6 +1925,8 @@ static int dtr_handle_tx_cq_event(struct ib_cq *cq, struct dtr_cm *cm)
struct dtr_path *path = cm->path;
struct dtr_transport *rdma_transport =
container_of(path->path.transport, struct dtr_transport, transport);
+ struct dtr_flow *flow;
+ struct dtr_stream *rdma_stream;
struct dtr_tx_desc *tx_desc;
struct ib_wc wc;
enum dtr_stream_nr stream_nr;
@@ -1916,12 +1938,20 @@ static int dtr_handle_tx_cq_event(struct ib_cq *cq, struct dtr_cm *cm)
tx_desc = (struct dtr_tx_desc *) (unsigned long) wc.wr_id;
stream_nr = tx_desc->imm.stream;
+ if (stream_nr != ST_FLOW_CTRL) {
+ flow = &path->flow[stream_nr];
+ rdma_stream = &rdma_transport->stream[stream_nr];
+ } else {
+ struct dtr_flow_control *msg = (struct dtr_flow_control *)tx_desc->data;
+ enum dtr_stream_nr send_from_stream = be32_to_cpu(msg->send_from_stream);
+ flow = &path->flow[send_from_stream];
+ rdma_stream = &rdma_transport->stream[send_from_stream];
+ }
if (wc.status != IB_WC_SUCCESS || wc.opcode != IB_WC_SEND) {
struct drbd_transport *transport = &rdma_transport->transport;
if (wc.status == IB_WC_RNR_RETRY_EXC_ERR) {
- struct dtr_flow *flow = &path->flow[stream_nr];
tr_err(transport, "tx_event: wc.status = IB_WC_RNR_RETRY_EXC_ERR\n");
tr_info(transport, "peer_rx_descs = %d", atomic_read(&flow->peer_rx_descs));
} else if (wc.status != IB_WC_WR_FLUSH_ERR) {
@@ -1930,6 +1960,7 @@ static int dtr_handle_tx_cq_event(struct ib_cq *cq, struct dtr_cm *cm)
wc.vendor_err, wc.byte_len, wc.ex.imm_data);
}
+ atomic_inc(&flow->peer_rx_descs);
set_bit(DSB_ERROR, &cm->state);
if (stream_nr != ST_FLOW_CTRL) {
@@ -1943,13 +1974,8 @@ static int dtr_handle_tx_cq_event(struct ib_cq *cq, struct dtr_cm *cm)
}
}
- if (stream_nr != ST_FLOW_CTRL) {
- struct dtr_flow *flow = &path->flow[stream_nr];
- struct dtr_stream *rdma_stream = &rdma_transport->stream[stream_nr];
-
- atomic_dec(&flow->tx_descs_posted);
- wake_up_interruptible(&rdma_stream->send_wq);
- }
+ atomic_dec(&flow->tx_descs_posted);
+ wake_up_interruptible(&rdma_stream->send_wq);
if (tx_desc)
dtr_free_tx_desc(cm, tx_desc);
@@ -2385,6 +2411,7 @@ static int dtr_repost_tx_desc(struct dtr_cm *old_cm, struct dtr_tx_desc *tx_desc
container_of(old_cm->path->path.transport, struct dtr_transport, transport);
enum drbd_stream stream = tx_desc->imm.stream;
struct dtr_cm *cm;
+ struct dtr_flow *flow;
int err;
do {
@@ -2399,10 +2426,21 @@ static int dtr_repost_tx_desc(struct dtr_cm *old_cm, struct dtr_tx_desc *tx_desc
continue;
}
+ flow = &cm->path->flow[stream];
+ if (atomic_dec_if_positive(&flow->peer_rx_descs) < 0) {
+ kref_put(&cm->kref, dtr_destroy_cm);
+ continue;
+ }
+ if (!atomic_inc_if_bwlow(&flow->tx_descs_posted, flow->tx_descs_max)) {
+ atomic_inc(&flow->peer_rx_descs);
+ kref_put(&cm->kref, dtr_destroy_cm);
+ continue;
+ }
+
err = __dtr_post_tx_desc(cm, tx_desc);
- if (!err) {
- struct dtr_flow *flow = &cm->path->flow[stream];
- atomic_inc(&flow->tx_descs_posted);
+ if (err) {
+ atomic_inc(&flow->peer_rx_descs);
+ atomic_dec(&flow->tx_descs_posted);
}
kref_put(&cm->kref, dtr_destroy_cm);
} while (err);
@@ -2440,6 +2478,12 @@ retry:
kref_put(&cm->kref, dtr_destroy_cm);
goto retry;
}
+ if (!atomic_inc_if_bwlow(&flow->tx_descs_posted, flow->tx_descs_max)) {
+ atomic_inc(&flow->peer_rx_descs);
+ kref_put(&cm->kref, dtr_destroy_cm);
+ goto retry;
+ }
+
device = cm->id->device;
switch (tx_desc->type) {
case SEND_PAGE:
@@ -2458,10 +2502,11 @@ retry:
}
err = __dtr_post_tx_desc(cm, tx_desc);
- if (!err)
- atomic_inc(&flow->tx_descs_posted);
- else
+ if (err) {
+ atomic_inc(&flow->peer_rx_descs);
+ atomic_dec(&flow->tx_descs_posted);
ib_dma_unmap_page(device, tx_desc->sge[0].addr, tx_desc->sge[0].length, DMA_TO_DEVICE);
+ }
out:
--
2.43.0
More information about the drbd-dev
mailing list