Skip to content
Snippets Groups Projects
Commit f9be6e10 authored by Mikhail Khalilov's avatar Mikhail Khalilov
Browse files

add flexio-based Allgather offloading POC

parents
Branches main
No related merge requests found
Showing with 697 additions and 0 deletions
# Allgather receive path offloading POC architecture overview
![design](./design.png "design")
## Processing engines and endpoints
Application client/server process specifies the *layout* of data processing on the send/receive in the two dimensions:
- *Processing Engines (PEs)* are used to execute application datapath code, e.g., to write multi-threaded client/server the user needs to create multiple PEs, where each PE communicates with the network through *Endpoints*. Currently two types of PEs are supported, IB Verbs CQ and DPA CQ, i.e., we back the Processing Element with one CQ.
- *Endpoints (EPs)* that are associated with PEs and represent the object that is used to post send/receive requests. Currently the POC supports three backends for endpoints: IB Verbs RC QP, DPA RC QP, DPA RC QP with staging memory.
![supported configurations](./supported_configs.png "supported configurations")
# Running POC
## Example 1: UC host-based client/server
Single-threaded CPU-driven setup without offloading (i.e., we assume only HW multicast supported).
### host04: client (sender) running on the host CPU
```sh
$ ssh 10.245.43.1
$ $POC_BMARK_BIN --rdma_dev=mlx5_0 --transport=host --epn=1 --pen=1 --payload_size=8388608 --chunk_size=4096 --print_header
```
### host05: server/receiver running on the host CPU
```sh
$ ssh 10.245.43.20
$ $POC_BMARK_BIN --rdma_dev=mlx5_2 --transport=host --epn=1 --pen=1 --payload_size=8388608 --chunk_size=4096 --bmark_client_addr=10.245.43.1
```
## Example 2: UC with the server datapath offloaded to the DPA
The bitmap state update and receive request reposting are offloaded to the DPA.
### host04: client (sender) running on the host CPU
1 processing engine (host CPU thread) spreads data across two endpoints, i.e., 1 CQ is subscribed with 2 QPs.
```sh
$ ssh 10.245.43.1
$ $POC_BMARK_BIN --rdma_dev=mlx5_0 --transport=dpa --epn=2 --pen=1 --payload_size=8388608 --chunk_size=4096 --print_header
```
### host05: server running on the host CPU with offloaded receive datapath
We create 2 processing engine on DPA and 1:1 associate them with endpoints, i.e., 2 DPA CQs are 1:1 mapped to 2 DPA QPs.
```sh
$ ssh 10.245.43.20
$ $POC_BMARK_BIN --rdma_dev=mlx5_2 --transport=dpa --epn=2 --pen=2 --payload_size=8388608 --chunk_size=4096 --bmark_client_addr=10.245.43.1
```
## Example 3: UD with the server-side fully offloaded to the Bluefield.
DPA takes care of reposing receives, writes from the staging memory to the user receive buf, and reliability state update. DPU handles infrastructure (DPA QP creation, staging memory allocation), while the user host CPU thread just posts receive buffer allocated in the host CPU memory.
### host04: client (sender) running on the host CPU
2 processing engines (host CPU threads) spreads data across 16 endpoints, i.e., 2 CQs are subscribed with 8 QPs each.
```sh
$ ssh 10.245.43.1
$ $POC_BMARK_BIN --rdma_dev=mlx5_0 --transport=dpa_dpu_proxy --epn=16 --pen=2 --payload_size=8388608 --chunk_size=4096 --print_header
```
### host05: daemon that emulates host application allocating the receive buffer
```sh
$ ssh 10.245.43.20
$ $POC_BMARK_BIN --hmem_daemon --rdma_dev=mlx5_2
```
### dpu05: server running on the DPU with data-path offloaded to 16 processing engines
Each DPA processing engine is associated with a DPA thread that serves an endpoint, i.e., 16 DPA CQs are 1:1 mapped to 16 DPA QPs.
```sh
$ ssh 10.245.43.20
$ ssh 192.168.100.2 ## we need to run the benchmark from the DPU in order to get staging in DPU DDR
dpu05$ $POC_BMARK_BIN --rdma_dev=mlx5_0 --transport=dpa_dpu_proxy --staging_mem_type=host --epn=16 --pen=16 --payload_size=8388608 --chunk_size=4096 --bmark_client_addr=10.245.43.1 --hmem_daemon_addr=10.245.43.20
```
## Example 4: Run full data collection pipeline (takes many hours)
Data collection script needs to be run twice:
1) from the server host CPU (**mode 0**)
2) from the server DPU to include scenarios with DPU DDR staging (**mode 2**)
### host05/dpu05
*Note:* for now all the paths in this benchmarking script are hard-coded
```sh
nohup python3 -u $POC_ROOT/flexio-sdk/apps/flexio_ag_bench/benchmarking/bmark.py <mode> <logs_output_directory> > bmark.log &
```
This diff is collapsed.
design.png

135 KiB

#pragma once
#include <stdint.h>
#define RD_INSTR(x) __asm__ volatile("rdinstret %0 " \
: "=r"((x)))
#define RD_CYCLE(x) __asm__ volatile("rdcycle %0 " \
: "=r"((x)))
#define MAX(a,b) ((a) > (b) ? a : b)
#define MIN(a,b) ((a) < (b) ? a : b)
#ifdef DEV_CODE
#pragma clang diagnostic ignored "-Wlanguage-extension-token"
#endif
typedef struct dpa_profile_counter
{
uint64_t sum;
uint64_t min, max;
uint32_t count;
} dpa_profile_counter_t;
typedef struct dpa_profile_timer
{
uint64_t cycle_tmp;
uint64_t instr_tmp;
uint8_t flag;
dpa_profile_counter_t cycles;
dpa_profile_counter_t instrs;
} dpa_profile_timer_t;
#ifdef DPA_PROFILE
#define DPA_PROFILE_TRACE_DEPTH 8
typedef struct dpa_profile_trace
{
uint32_t values[DPA_PROFILE_TRACE_DEPTH];
uint32_t counter;
} dpa_profile_trace_t;
#define DPA_PROFILE_TRACE_ADD(info, val) \
{ \
(info).values[(info).counter % DPA_PROFILE_TRACE_DEPTH] = (uint32_t) val; \
(info).counter++; \
}
#define DPA_PROFILE_TRACE_TIME(info) \
{ \
uint64_t tmp; \
RD_CYCLE(tmp); \
DPA_PROFILE_TRACE_ADD(info, tmp); \
}
#define DPA_PROFILE_COUNTER_ADD(info, val) \
{ \
(info).sum += val; \
(info).max = MAX((info).max, val); \
(info).min = MIN((info).min, val); \
(info).count++; \
}
#define DPA_PROFILE_TIMER_LAP(info) \
{ \
uint64_t tmp_cycle, tmp_instr; \
RD_CYCLE(tmp_cycle); \
RD_INSTR(tmp_instr); \
if ((info).flag) \
{ \
uint64_t lap_cycle = tmp_cycle - (info).cycle_tmp; \
uint64_t lap_instr = tmp_instr - (info).instr_tmp; \
DPA_PROFILE_COUNTER_ADD(info.cycles, lap_cycle); \
DPA_PROFILE_COUNTER_ADD(info.instrs, lap_instr); \
} \
(info).cycle_tmp = tmp_cycle; \
(info).instr_tmp = tmp_instr; \
(info).flag = 1; \
}
#define DPA_PROFILE_TRACE_DECLARE(name) dpa_profile_trace_t name
#define DPA_PROFILE_TIMER_DECLARE(name) dpa_profile_timer_t name
#define DPA_PROFILE_COUNTER_DECLARE(name) dpa_profile_counter_t name
#define DPA_PROFILE_TRACE_INIT(info) \
{ \
for (int i=0; i<DPA_PROFILE_TRACE_DEPTH; i++) \
{ \
(info).values[i] = 0; \
} \
info.counter = 0; \
}
#define DPA_PROFILE_COUNTER_INIT(info) \
{ \
info.sum = 0; \
info.min = UINT64_MAX; \
info.max = 0; \
info.count = 0; \
}
#define DPA_PROFILE_TIMER_INIT(info) \
{ \
DPA_PROFILE_COUNTER_INIT((info).cycles);\
DPA_PROFILE_COUNTER_INIT((info).instrs);\
info.flag = 0; \
}
#define DPA_PROFILE_TIMER_START(info) \
{ \
RD_INSTR((info).instr_tmp); \
RD_CYCLE((info).cycle_tmp); \
}
#define DPA_PROFILE_TIMER_STOP(info) \
{ \
uint64_t tmp_cycle, tmp_instr; \
RD_CYCLE(tmp_cycle); \
RD_INSTR(tmp_instr); \
tmp_cycle = (tmp_cycle - (info).cycle_tmp); \
tmp_instr = (tmp_instr - (info).instr_tmp); \
DPA_PROFILE_COUNTER_ADD(info.cycles, tmp_cycle); \
DPA_PROFILE_COUNTER_ADD(info.instrs, tmp_instr); \
}
#define DPA_PROFILE_COUNTER_AVG(info) ((uint64_t) (info.sum / info.count))
#else
#define DPA_PROFILE_TIMER_DECLARE(name)
#define DPA_PROFILE_TIMER_INIT(info)
#define DPA_PROFILE_TIMER_START(info)
#define DPA_PROFILE_TIMER_STOP(info)
#define DPA_PROFILE_COUNTER_AVG(info) 0
#define DPA_PROFILE_TIMER_LAP(info)
#define DPA_PROFILE_COUNTER_DECLARE(info)
#define DPA_PROFILE_COUNTER_INIT(info)
#define DPA_PROFILE_COUNTER_ADD(info, val)
#define DPA_PROFILE_TRACE_TIME(info)
#define DPA_PROFILE_TRACE_ADD(info, val)
#define DPA_PROFILE_TRACE_DECLARE(name)
#define DPA_PROFILE_TRACE_INIT(info)
#endif
This diff is collapsed.
# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
sample_dev_srcs = name + '_dev.c'
app_common_dir = 'apps/common'
app_out_name = dev_app_name + '.a'
dpacc_mode = '--application'
inc_app_host = run_command(tools , '--list-all',
join_paths(meson.current_source_dir(), '..', 'host', '*_com.hpp'),
check: true).stdout().strip().split('\n')
inc_tgt_app = inc_tgt_app_common + inc_app_host
asm_compile = ['-disable-asm-checks', '-disable-asm-checks-nowarn']
temp_app_target = custom_target('app_' + name,
command: [ dpacc_build_script, dpacc_mode,
'--app_name', dev_app_name,
'--flexio_root', '@SOURCE_ROOT@',
'--build_root', '@BUILD_ROOT@',
'--srcs', '@INPUT@',
'--dpacc_build_dir', '@OUTDIR@',
'--external_cc_options', device_cc_params,
'--additional_include_directories', join_paths(dir_base, app_common_dir),
'--additional_ld_libs', 'app_common_dev',
'--additional_lib_paths', join_paths(project_build_root, app_common_dir),
'--additional_dpacc_options', dpa_cc_options + [asm_compile],
'--debug', debug_dpacc ],
output: app_out_name,
input: sample_dev_srcs,
depends: [tgt_flexio_dev, tgt_app_archive],
depend_files: inc_tgt_app)
temp_dep = declare_dependency(link_args : join_paths(meson.current_build_dir(), app_out_name))
This diff is collapsed.
This diff is collapsed.
#ifndef _DPA_TRANSPORT_HPP_
#define _DPA_TRANSPORT_HPP_
#include <string>
#include <libflexio/flexio.h>
#include "utils.hpp"
#include "ibv_transport.hpp"
#include "flexio_ag_bench_com.hpp"
#define L2V(l) (1UL << (l))
#define L2M(l) (L2V(l) - 1)
#define FAST_LOG2(x) (sizeof(unsigned long)*8 - 1 - __builtin_clzl((unsigned long)(x)))
#define FAST_LOG2_UP(x) (((x) - (1 << FAST_LOG2(x))) ? FAST_LOG2(x) + 1 : FAST_LOG2(x))
#define FLEXIO_CHKERR(x) CHKERR(FLEXIO, x)
#define FLEXIO_CHKERR_PTR(x, ptr) CHKERR_PTR(FLEXIO, x, ptr)
#define FLEXIO_HARTS_PER_CORE 16
#define FLEXIO_HART_NUM(core_num, hart_num_in_core) ((core_num) * FLEXIO_HARTS_PER_CORE + hart_num_in_core)
#define LOG_FLEXIO_CQE_BSIZE 6
#define LOG_FLEXIO_RWQE_BSIZE 4
#define LOG_FLEXIO_SWQE_BSIZE 6
#define LOG_FLEXIO_CQ_BSIZE(log_q_depth) ((log_q_depth) + LOG_FLEXIO_CQE_BSIZE)
#define LOG_FLEXIO_RQ_BSIZE(log_q_depth) ((log_q_depth) + LOG_FLEXIO_RWQE_BSIZE)
const std::size_t flexio_stream_buff_size = 2048 * 2048;
struct dpa_dev_ctx : public ib_dev_ctx
{
dpa_dev_ctx(std::string device_name);
~dpa_dev_ctx();
void stream_flush();
void dev_ctx_flush();
void app_ctx_alloc(std::size_t max_ctx_size);
void app_ctx_free();
void app_ctx_export(uint64_t app_id, void *ctx, std::size_t ctx_size);
uint64_t rpc_call(flexio_func_t &rpc);
struct flexio_uar *_dpa_uar;
struct flexio_process *_process;
struct flexio_outbox *_outbox;
struct flexio_window *_window;
struct flexio_msg_stream *_msg_stream;
struct dpa_export_data _export_data;
flexio_uintptr_t _export_data_daddr;
};
struct dpa_pe_ctx : public pe_ctx
{
dpa_pe_ctx(struct dev_ctx *dev, std::size_t batch_size, std::size_t idx,
std::size_t q_len, bool active = true);
~dpa_pe_ctx();
int poll();
void poll_worker(std::function<bool(struct ibv_wc *wc, struct bmark_ctx &ctx)> cb,
struct bmark_ctx &ctx);
struct dpa_dev_ctx *_dev;
struct flexio_event_handler *_eh;
struct flexio_cq *_cq;
struct dpa_transfer_cq _cq_transfer;
};
struct dpa_rc_ep_ctx : public ep_ctx
{
dpa_rc_ep_ctx(struct dev_ctx *dev, struct pe_ctx *pe, std::size_t idx, std::size_t q_len,
bool rnr_retry,
bool root = true,
bool active_loopback = true,
bool remote_loopback_end = false);
~dpa_rc_ep_ctx();
void connect(struct ep_addr remote_addr);
void post_send(struct ibv_mr *mr, void *buf, std::size_t len, uint64_t id, bool unsignaled);
void post_recv(struct ibv_mr *mr, void *buf, std::size_t len, uint64_t id);
// for now we use host2dpa only for thread activation
void post_send_host2dpa(struct ibv_mr *mr, void *buf, std::size_t len, uint64_t id);
void post_recv_host2dpa(struct ibv_mr *mr, void *buf, std::size_t len, uint64_t id);
struct dpa_dev_ctx *_dev;
struct dpa_pe_ctx *_pe;
struct flexio_qp *_qp;
struct ib_cq_pe_ctx *_host_cq;
struct rc_ep_ctx *_host_qp;
struct dpa_pe_ctx *_loopback_dpa_cq;
struct dpa_rc_ep_ctx *_loopback_dpa_qp;
flexio_uintptr_t _wq_daddr;
struct flexio_mkey *_rqd_mkey;
struct dpa_transfer_qp _qp_transfer;
};
struct dpa_rc_staging_ep_ctx : public dpa_rc_ep_ctx
{
enum staging_mem_type {
MEM_TYPE_MEMIC = 0x0,
MEM_TYPE_DPA = 0x1,
MEM_TYPE_HOST = 0x2
};
static int str2memtype(const std::string &str_mem_type);
dpa_rc_staging_ep_ctx(struct dev_ctx *dev, struct pe_ctx *pe,
std::size_t idx, std::size_t q_len,
bool rnr_retry,
int staging_mem_type = MEM_TYPE_HOST,
bool remote_loopback_end = false);
~dpa_rc_staging_ep_ctx();
void connect_to_hmem_daemon(struct ep_addr hmem_daemon_addr);
void connect(struct ep_addr remote_addr) { dpa_rc_ep_ctx::connect(remote_addr); };
void post_recv(struct ibv_mr *mr, void *buf, std::size_t len, uint64_t id);
std::size_t _max_pkt_size;
std::size_t _staging_rq_pi;
std::size_t _staging_rq_size;
struct ibv_dm *_staging_dm;
struct ibv_mr *_staging_mr;
struct flexio_mkey *_staging_dpa_mkey;
uint64_t _staging_addr;
int _staging_mem_type;
};
#endif
This diff is collapsed.
#ifndef __FLEXIO_HELLO_WORLD_COM_H__
#define __FLEXIO_HELLO_WORLD_COM_H__
//#define DPA_DEBUG
//#define DPA_PROFILE
//#define DPA_CQ_POLL_PROFILE
#define DPA_RELIABILITY_BITMAP
#define DPA_RELIABILITY_BITMAP_NAIVE
//#define DPA_RELIABILITY_BITMAP_DEBUG
//#define DPA_RELIABILITY_COUNTER
// ugly
#define MAX_DATAPATH_WIDTH 128 // witdh == how many threads can be in the activated state at once
#define NET_CQQP_ARRAY_BASE_IDX 0
#define LOOPBACK_CQQP_ARRAY_BASE_IDX (NET_CQQP_ARRAY_BASE_IDX + MAX_DATAPATH_WIDTH)
#define HOST_CQQP_ARRAY_BASE_IDX (LOOPBACK_CQQP_ARRAY_BASE_IDX + MAX_DATAPATH_WIDTH)
#define MAX_FLEXIO_CQS (HOST_CQQP_ARRAY_BASE_IDX + MAX_DATAPATH_WIDTH)
#define MAX_FLEXIO_QPS MAX_FLEXIO_CQS
#define MAX_FLEXIO_DEV_QPS HOST_CQQP_ARRAY_BASE_IDX // threads need to access only network and loopback QPs
#define MAX_FLEXIO_DEV_CQS HOST_CQQP_ARRAY_BASE_IDX // threads need to access only network and loopback QPs
#define DPA_WORK_COMPLETED_MAGICNUM 0xDEADBEAF
#define DPA_BRINGUP_TEST_MAGICNUM 46224
#define DPA_MAX_NET_CQ_POLL_BATCH 128
#define DPA_RECVBUF_RELIABILITY_ARRAY_SIZE 1024 // 0.25GB per QP
#define DPA_CQE_NOTIFICATION_FREQUENCY 16
enum dpa_eh_types {
DPA_EH_PINGPONG_SERVER = 0x0,
DPA_EH_TPUT_SERVER = 0x1,
DPA_EH_TPUT_STAGING_SERVER = 0x2,
DPA_EH_TPUT_CLIENT = 0x3,
};
// PRM, Table 212
enum dpa_opcode_types {
DPA_CQE_REQUESTER = 0x0,
DPA_CQE_RESPONDER_WRITE_W_IMM = 0x1,
DPA_CQE_RESPONDER_SEND = 0x2,
DPA_CQE_RESPONDER_SEND_WITH_IMM = 0x3
};
struct dpa_pingpong_server_ctx {
uint32_t cq_id;
uint32_t qp_id;
uint64_t dgram_size;
uint64_t base_info_size;
uint64_t lkey;
uint64_t ptr;
} __attribute__((__packed__, aligned(8)));
typedef struct {
uint32_t cq_number;
struct flexio_dev_cqe64 *cq_ring, *cqe;
uint32_t cq_idx;
uint8_t cq_hw_owner_bit;
uint32_t *cq_dbr;
uint32_t log_cq_depth;
} dpa_cq_ctx_t;
struct dpa_transfer_cq {
uint32_t cq_num;
uint32_t log_cq_depth;
uint32_t ci_idx;
uint32_t poll_batch_size;
uint8_t hw_owner_bit;
uint8_t reserved[7];
flexio_uintptr_t cq_ring_daddr;
flexio_uintptr_t cq_dbr_daddr;
} __attribute__((__packed__, aligned(8)));
struct dpa_transfer_qp {
uint32_t qp_num;
uint32_t sqd_mkey_id;
uint32_t rqd_mkey_id;
uint32_t reserved;
uint32_t log_qp_sq_depth;
uint32_t log_qp_rq_depth;
uint32_t sq_ci_idx;
uint32_t rq_ci_idx;
uint32_t sq_pi_idx;
uint32_t rq_pi_idx;
uint64_t sqd_lkey;
uint64_t rqd_lkey;
flexio_uintptr_t qp_dbr_daddr;
flexio_uintptr_t qp_sq_daddr;
flexio_uintptr_t qp_rq_daddr;
flexio_uintptr_t sqd_daddr;
flexio_uintptr_t rqd_daddr;
} __attribute__((__packed__, aligned(8)));
struct dpa_tput_ctx {
uint32_t worker_id;
uint32_t finisher_id;
uint32_t n_workers;
uint32_t chunk_size;
uint32_t tx_window;
uint32_t iter;
uint64_t to_process;
uint64_t payload_rkey;
uint64_t payload_ptr;
uint64_t ack_lkey;
uint64_t ack_ptr;
uint64_t counter_lkey;
uint64_t counter_ptr;
#ifdef DPA_RELIABILITY_BITMAP
uint64_t recvbuf_bitmaps[MAX_DATAPATH_WIDTH][DPA_RECVBUF_RELIABILITY_ARRAY_SIZE];
#elif defined(DPA_RELIABILITY_COUNTER)
uint32_t gaps_to_fetch[MAX_DATAPATH_WIDTH][DPA_RECVBUF_RELIABILITY_ARRAY_SIZE * 2];
#endif
} __attribute__((__packed__, aligned(8)));
/* Transport data from HOST application to HW application */
struct dpa_export_data {
uint32_t outbox_id;
uint32_t window_id;
struct dpa_transfer_cq cq_transfer[MAX_FLEXIO_DEV_CQS];
struct dpa_transfer_qp qp_transfer[MAX_FLEXIO_DEV_QPS];
uint32_t n_cqs;
uint32_t n_qps;
flexio_uintptr_t verb;
flexio_uintptr_t app_ctx;
} __attribute__((__packed__, aligned(8)));
#endif
This diff is collapsed.
#ifndef _IBV_TRANSPORT_HPP_
#define _IBV_TRANSPORT_HPP_
#include <string>
#include "transport_def.hpp"
#include "utils.hpp"
#define IBV_CHKERR(x) CHKERR(IBV, x)
#define IBV_CHKERR_PTR(x, ptr) CHKERR_PTR(IBV, x, ptr)
extern std::string mlx5_iface_name;
extern const std::size_t mlx5_max_gid_table_entries;
extern const std::size_t mlx5_max_mtu;
extern const int mlx5_default_qkey;
extern const int mlx5_default_pkey_index;
extern const int mlx5_default_port_num;
extern const int mlx5_default_sl;
extern const std::size_t mlx5_if_namesize;
extern const std::size_t max_path_len;
bool ib_ctx_query_params(struct ibv_context *ctx, struct ep_addr &addr);
struct ibv_mr *
ib_mr_reg(struct ibv_pd *pd, void *buf, std::size_t len);
void ib_mr_dereg(struct ibv_mr *mr);
struct ib_dev_ctx : public dev_ctx
{
ib_dev_ctx(const std::string &dev_name);
~ib_dev_ctx();
struct ibv_mr* reg_mr(void *buf, std::size_t len);
void dereg_mr(struct ibv_mr* mr);
struct ibv_context *_ctx;
struct ibv_pd *_pd;
};
struct ib_cq_pe_ctx : public pe_ctx
{
ib_cq_pe_ctx(struct dev_ctx *dev, std::size_t batch_size, std::size_t idx, std::size_t q_len);
~ib_cq_pe_ctx();
int poll();
int poll(struct ibv_wc **wcs);
void poll_worker(std::function<bool(struct ibv_wc *wc, struct bmark_ctx &ctx)> cb,
struct bmark_ctx &ctx);
struct ibv_cq *_cq;
protected:
std::vector<struct ibv_wc> _wcs;
};
struct ud_ep_ctx : public ep_ctx
{
ud_ep_ctx(struct dev_ctx *dev, struct pe_ctx *pe, std::size_t idx, std::size_t q_len);
~ud_ep_ctx();
void connect(struct ep_addr remote_addr);
void post_send(struct ibv_mr *mr, void *buf, std::size_t len, uint64_t id, bool unsignaled);
void post_recv(struct ibv_mr *mr, void *buf, std::size_t len, uint64_t id);
protected:
struct ib_dev_ctx *_dev;
struct ib_cq_pe_ctx *_pe;
struct ibv_qp *_qp;
struct ibv_ah * _ah;
};
struct rc_ep_ctx : public ep_ctx
{
rc_ep_ctx(struct dev_ctx *dev, struct pe_ctx *pe, std::size_t idx, std::size_t q_len, bool rnr_retry = false);
~rc_ep_ctx();
void connect(struct ep_addr remote_addr);
void post_send(struct ibv_mr *mr, void *buf, std::size_t len, uint64_t id, bool unsignaled);
void post_recv(struct ibv_mr *mr, void *buf, std::size_t len, uint64_t id);
void post_send_batch(struct ibv_mr *smr, void *sbuf, std::size_t len,
std::size_t batch_size, uint64_t id, uint32_t send_start_idx);
void post_write_with_imm(struct ibv_mr *smr, void *sbuf, uint64_t rkey, uint64_t rbuf,
std::size_t len, uint64_t id);
void post_write_with_imm_batch(struct ibv_mr *smr, void *sbuf, uint64_t rkey, uint64_t rbuf,
std::size_t len, std::size_t batch_size, uint64_t id,
uint32_t write_start_idx);
protected:
struct ib_dev_ctx *_dev;
struct ib_cq_pe_ctx *_pe;
struct ibv_qp *_qp;
private:
std::vector<struct ibv_send_wr> _wrs_pool;
std::vector<struct ibv_sge> _sge_pool;
bool _rnr_retry;
};
struct dummy_rc_ep_ctx : public rc_ep_ctx
{
dummy_rc_ep_ctx(struct dev_ctx *dev, struct pe_ctx *pe);
void connect(struct ep_addr remote_addr);
};
#endif
#dpa_app = c.find_library(dev_app_name,
# dirs: dpacc_output,
# required: true)
src = files('ibv_transport.cpp',
'dpa_transport.cpp'
)
cpp = meson.get_compiler('cpp')
executable(name,
[name + '.cpp'],
sources: src,
native: true,
include_directories: [top_inc, app_common_inc],
dependencies: [ibverbs_dep, mlx5_dep, temp_dep],
link_with: [libflexio, libapp_common_host],
link_depends : temp_app_target
)
\ No newline at end of file
#ifndef _TRANSPORT_HPP_
#define _TRANSPORT_HPP_
#include "transport_def.hpp"
#include "dpa_transport.hpp"
#include "ibv_transport.hpp"
#endif
This diff is collapsed.
This diff is collapsed.
#/bin/sh
kill -9 $(pgrep flexio)
apps = [
'flexio_net_multi_process',
'flexio_net_multi_thread',
'flexio_event_handler',
'flexio_error_handling',
'flexio_multi_function',
'flexio_rpc',
'flexio_window',
'flexio_cmd_queue',
'flexio_ag_bench',
'qp_apps',
]
subdir('common')
foreach name: apps
subdir(name)
endforeach
This diff is collapsed.
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment