From e770e116c2400a2613b23803793623a93d8329a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E5=87=A4?= Date: Tue, 22 Jul 2025 11:28:19 +0000 Subject: [PATCH 1/5] feat: deal kaelz4 dst buf error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 白凤 --- KAELz4/README.md | 18 ++++++++++++++++++ KAELz4/include/kaelz4.h | 1 + KAELz4/src/kaelz4_adapter.c | 2 +- KAELz4/src/v1/kaelz4_comp.h | 5 +++-- 4 files changed, 23 insertions(+), 3 deletions(-) diff --git a/KAELz4/README.md b/KAELz4/README.md index 5939e65..ac99063 100644 --- a/KAELz4/README.md +++ b/KAELz4/README.md @@ -137,7 +137,16 @@ int KAELZ4_rebuild_lz77_to_frame(const struct kaelz4_buffer_list *src, struct ka ``` #### 3.1.7、清理session会话 ``` +/** + * @brief: Destroy session and hardware ctx. + * @param: sess : session + */ void KAELZ4_destroy_async_compress_session(void *sess); +/** + * @brief: reset session and hardware ctx, all compress tasks will be canceled. + * @param: sess : session + */ +void KAELZ4_reset_session(void *sess); ``` #### 3.1.8、整体使用示例Demo 本demo使用polling模式接口,将测试文件压缩为lz77_raw数据格式,随后转换成标准lz4的block数据格式,最后通过解压转换为原始文件。 @@ -585,7 +594,16 @@ void KAELZ4_compress_async_polling_in_session(void *sess, int budget); ``` #### 3.2.6、清理session会话 ``` +/** + * @brief: Destroy session and hardware ctx. + * @param: sess : session + */ void KAELZ4_destroy_async_compress_session(void *sess); +/** + * @brief: reset session and hardware ctx, all compress tasks will be canceled. + * @param: sess : session + */ +void KAELZ4_reset_session(void *sess); ``` #### 3.2.7、polling接口整体使用示例Demo 本demo使用polling模式接口,通过初始化session上下文,调用frame格式异步压缩接口, diff --git a/KAELz4/include/kaelz4.h b/KAELz4/include/kaelz4.h index 764e364..47b1e14 100644 --- a/KAELz4/include/kaelz4.h +++ b/KAELz4/include/kaelz4.h @@ -70,6 +70,7 @@ typedef struct { #define KAE_LZ4_ALLOC_FAIL 5 #define KAE_LZ4_SET_FAIL 6 #define KAE_LZ4_HW_TIMEOUT_FAIL 7 +#define KAE_LZ4_DST_BUF_OVERFLOW 8 #define KAE_LZ77_SEQ_DATA_SIZE_PER_64K (128UL * 1024UL) diff --git a/KAELz4/src/kaelz4_adapter.c b/KAELz4/src/kaelz4_adapter.c index 2d0daf4..786326d 100644 --- a/KAELz4/src/kaelz4_adapter.c +++ b/KAELz4/src/kaelz4_adapter.c @@ -714,7 +714,7 @@ static int kaelz4_check_param_valid(const struct kaelz4_buffer_list *src, struct } // now only support dst->buf_bum == 1 - if (unlikely(dst->buf_num != 1 || dst->buf[0].data == NULL || dst->buf[0].buf_len == 0)) { + if (unlikely(dst->buf_num != 1 || dst->buf[0].data == NULL || dst->buf[0].buf_len == 0 || src->buf_num <= 0)) { return KAE_LZ4_INVAL_PARA; } diff --git a/KAELz4/src/v1/kaelz4_comp.h b/KAELz4/src/v1/kaelz4_comp.h index 41187cf..e756d8f 100644 --- a/KAELz4/src/v1/kaelz4_comp.h +++ b/KAELz4/src/v1/kaelz4_comp.h @@ -56,6 +56,9 @@ struct kaelz4_priv_save_info { unsigned int prev_last_lit_buf_index; // 用户输入数据>64K需要分块、返回BLOCK格式、现有保序返回切块压缩结果的约束下,记录前一个分块的last literal信息 const struct kaelz4_buffer_list *src; LZ4F_preferences_t preferences; + int *status; + size_t dstCapacity; + size_t dst_len; }; typedef int (*kaelz4_post_process_handle_t)(struct kaelz4_async_req *req, const struct wd_buf_list *source, @@ -63,8 +66,6 @@ typedef int (*kaelz4_post_process_handle_t)(struct kaelz4_async_req *req, const struct kaelz4_compress_ctx { size_t srcSize; - size_t dstCapacity; - size_t dst_len; const struct kaelz4_buffer_list *src; struct kaelz4_buffer_list *dst; struct kaelz4_priv_save_info save_info; -- Gitee From 88395b515e545229e8391ea759ded26eb9e565be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E5=87=A4?= Date: Tue, 22 Jul 2025 11:29:11 +0000 Subject: [PATCH 2/5] feat: deal kaelz4 dst buf error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 白凤 --- KAELz4/src/v1/kaelz4_comp.c | 75 ++++++++++++++++++++++++++----------- 1 file changed, 54 insertions(+), 21 deletions(-) diff --git a/KAELz4/src/v1/kaelz4_comp.c b/KAELz4/src/v1/kaelz4_comp.c index 04919cc..4c8e351 100644 --- a/KAELz4/src/v1/kaelz4_comp.c +++ b/KAELz4/src/v1/kaelz4_comp.c @@ -274,7 +274,7 @@ static void kaelz4_compress_async_callback(struct kaelz4_compress_ctx *compress_ { struct kaelz4_result *result = compress_ctx->result; result->status = status; - result->dst_len = compress_ctx->dst_len; + result->dst_len = compress_ctx->save_info.dst_len; if (result->ibuf_crc != NULL && status == KAE_LZ4_SUCC && compress_ctx->data_format != KAELZ4_ASYNC_LZ77_RAW) { for (int i = 0; i < compress_ctx->src->buf_num; i++) { *result->ibuf_crc = KAELZ4CRC32(*result->ibuf_crc, compress_ctx->src->buf[i].data, @@ -283,7 +283,7 @@ static void kaelz4_compress_async_callback(struct kaelz4_compress_ctx *compress_ } if (result->obuf_crc != NULL && status == KAE_LZ4_SUCC && compress_ctx->data_format != KAELZ4_ASYNC_LZ77_RAW) { - *result->obuf_crc = KAELZ4CRC32(*result->obuf_crc, compress_ctx->dst->buf[0].data, compress_ctx->dst_len); + *result->obuf_crc = KAELZ4CRC32(*result->obuf_crc, compress_ctx->dst->buf[0].data, compress_ctx->save_info.dst_len); } if (unlikely(status != KAE_LZ4_SUCC)) { @@ -313,6 +313,11 @@ static int kaelz4_triples_rebuild(struct kaelz4_async_req *req, const struct wd_ seqSum = req->zc.seqnum; } + if (req->src_size + req->src_size / 255 + 16 >= save_info->dstCapacity - save_info->dst_len) { + *save_info->status = KAE_LZ4_DST_BUF_OVERFLOW; + return 0; + } + size_t total_src_bytes = req->src_size; U32 seqCount = 0; U32 tempLiteralLength = 0; @@ -428,6 +433,10 @@ static int kaelz4_triples_rebuild_64Kblock(struct kaelz4_async_req *req, const s } else { seqSum = req->zc.seqnum; } + if (req->src_size + save_info->prev_last_lit_len + (req->src_size + save_info->prev_last_lit_len) / 255 + 16 >= save_info->dstCapacity - save_info->dst_len) { + *save_info->status = KAE_LZ4_DST_BUF_OVERFLOW; + return 0; + } size_t total_src_bytes = req->src_size; U32 seqCount = 0; @@ -633,9 +642,13 @@ static void kaelz4_async_compress_cb(int status, void *param) kaelz4_ctx_t* kaelz4_ctx = (kaelz4_ctx_t*)zc->kaeConfig; struct wcrypto_comp_op_data *op_data = &kaelz4_ctx->op_data; - if (status != 0) { + if (status != WCRYPTO_STATUS_NULL) { + if (status == WD_IN_EPARA) { + req->compress_ctx->status = KAE_LZ4_DST_BUF_OVERFLOW; + } else { + req->compress_ctx->status = KAE_LZ4_COMP_FAIL; + } US_ERR("kaelz4_async_compress_cb status %d !\n", status); - req->compress_ctx->status = KAE_LZ4_COMP_FAIL; req->done = 1; return; } @@ -884,6 +897,12 @@ static int kaelz4_async_frame_padding(struct kaelz4_async_req *req, const struct void *dst_after_frameheader = dst_tmp; LZ4F_frameInfo_t frameinfo_ptr = save_info->preferences.frameInfo; + // frame模式下35为对padding大小的评估 + if (req->src_size + req->src_size / 255 + 16 + 35 >= save_info->dstCapacity - save_info->dst_len) { + *save_info->status = KAE_LZ4_DST_BUF_OVERFLOW; + return 0; + } + if (save_info->src->buf_num != 1) { frameinfo_ptr.contentChecksumFlag = LZ4F_noContentChecksum; } @@ -1043,13 +1062,14 @@ void kaelz4_async_instances_deinit(struct kaelz4_async_ctrl *ctrl) static int kaelz4_async_sw_compress(struct kaelz4_async_ctrl *ctrl, struct kaelz4_compress_ctx *compress_ctx) { int ret = -1; - compress_ctx->status = KAE_LZ4_SUCC; if (compress_ctx->data_format == KAELZ4_ASYNC_FRAME && ctrl->sw_compress_frame != NULL) { - ret = ctrl->sw_compress_frame(compress_ctx->dst->buf[0].data, compress_ctx->dstCapacity, compress_ctx->src->buf[0].data, - compress_ctx->srcSize, &compress_ctx->save_info.preferences); + compress_ctx->status = KAE_LZ4_SUCC; + ret = ctrl->sw_compress_frame(compress_ctx->dst->buf[0].data, compress_ctx->save_info.dstCapacity, compress_ctx->src->buf[0].data, + compress_ctx->srcSize, &compress_ctx->save_info.preferences); } else if (compress_ctx->data_format <= KAELZ4_ASYNC_BLOCK && ctrl->sw_compress != NULL) { + compress_ctx->status = KAE_LZ4_SUCC; ret = ctrl->sw_compress(compress_ctx->src->buf[0].data, compress_ctx->dst->buf[0].data, compress_ctx->srcSize, - compress_ctx->dstCapacity); + compress_ctx->save_info.dstCapacity); } ret = (ret == 0) ? KAE_LZ4_SW_RETURN_0_FAIL : ret; return ret; @@ -1075,7 +1095,7 @@ int kaelz4_async_compress_polling(struct kaelz4_async_ctrl *ctrl, int budget) if (likely(compress_ctx->status == KAE_LZ4_SUCC)) { ret = compress_ctx->kaelz4_post_process_handle(req, &req->src, - compress_ctx->dst->buf[0].data + compress_ctx->dst_len, + compress_ctx->dst->buf[0].data + compress_ctx->save_info.dst_len, &compress_ctx->save_info); if (ret < 0) { US_ERR("kaelz4_post_process_handle err. ret=%d\n", ret); @@ -1090,16 +1110,16 @@ int kaelz4_async_compress_polling(struct kaelz4_async_ctrl *ctrl, int budget) } if (ret >= 0 && compress_ctx->status == KAE_LZ4_SUCC) { - compress_ctx->dst_len += ret; + compress_ctx->save_info.dst_len += ret; compress_ctx->status = KAE_LZ4_SUCC; } else { - compress_ctx->dst_len = 0; + compress_ctx->save_info.dst_len = 0; if (compress_ctx->status == KAE_LZ4_SUCC) { compress_ctx->status = KAE_LZ4_COMP_FAIL; } US_ERR("kae post process fail! req index %d src size 0x%lx dst size 0x%lx last %d ret = %d status %d\n", - req->idx, req->src_size, compress_ctx->dstCapacity, req->last, ret, compress_ctx->status); + req->idx, req->src_size, compress_ctx->save_info.dstCapacity, req->last, ret, compress_ctx->status); } if (!req->special_flag) { @@ -1273,8 +1293,18 @@ static void kaelz4_fill_hw_req_dst_buf_list(struct kaelz4_async_req *req, const req->dst.buf_num = 1; struct kaelz4_seq_result *seq_result = dst->buf[0].data + req->idx * KAE_LZ77_SEQ_DATA_SIZE_PER_64K; req->dst.buf[0].data = seq_result->seq_start; - req->dst.buf[0].buf_len = KAE_LZ77_SEQ_DATA_SIZE_PER_64K - sizeof(seq_result->seq_num); + if (req->dst.buf[0].data >= dst->buf[0].data + dst->buf[0].buf_len) { + req->dst.buf[0].buf_len = 0; + return; + } + + if (dst->buf[0].buf_len >= (req->idx + 1) * KAE_LZ77_SEQ_DATA_SIZE_PER_64K) { + req->dst.buf[0].buf_len = KAE_LZ77_SEQ_DATA_SIZE_PER_64K - sizeof(seq_result->seq_num); + } else { + req->dst.buf[0].buf_len = dst->buf[0].data + dst->buf[0].buf_len - req->dst.buf[0].data; + } } + return; } static void kaelz4_fill_hw_req_src_buf_list(struct kaelz4_async_req *req, const struct kaelz4_buffer_list *src, @@ -1413,16 +1443,17 @@ int kaelz4_compress_async(struct kaelz4_async_ctrl *ctrl, const struct kaelz4_bu } compress_ctx->dst = dst; - compress_ctx->dstCapacity = result->dst_len; + compress_ctx->save_info.dstCapacity = result->dst_len; compress_ctx->src = src; compress_ctx->srcSize = result->src_size; compress_ctx->callback = callback; compress_ctx->result = result; compress_ctx->data_format = data_format; compress_ctx->kaelz4_post_process_handle = g_post_process_handle[data_format]; - compress_ctx->dst_len = 0; + compress_ctx->save_info.dst_len = 0; compress_ctx->next = NULL; compress_ctx->status = KAE_LZ4_SUCC; + compress_ctx->save_info.status = &compress_ctx->status; compress_ctx->req_list = NULL; compress_ctx->save_info.preferences = *ptr; compress_ctx->save_info.prev_last_lit_ptr = NULL; @@ -1465,13 +1496,15 @@ int kaelz4_triples_rebuild_impl(const struct kaelz4_buffer_list *src, struct kae size_t remainingLength = result->src_size; // 该值用于保存剩余的待压缩数据长度 unsigned int buf_index = 0; size_t buf_offset = 0; - size_t dst_len = 0; int idx = 0; struct kaelz4_seq_result *seq_result = tuple_buf->buf[0].data; struct kaelz4_priv_save_info save_info = {0}; int ret; save_info.src = src; + save_info.dstCapacity = dst->buf[0].buf_len; + save_info.dst_len = 0; + save_info.status = &result->status; if (ptr) save_info.preferences = *ptr; @@ -1489,14 +1522,14 @@ int kaelz4_triples_rebuild_impl(const struct kaelz4_buffer_list *src, struct kae } req.zc.seqStore.sequencesStart = (seqDef *)seq_result->seq_start; req.zc.seqnum = seq_result->seq_num; - ret = g_post_process_handle[data_format](&req, &req.src, dst->buf[0].data + dst_len, &save_info); - if (ret <= 0) { + ret = g_post_process_handle[data_format](&req, &req.src, dst->buf[0].data + save_info.dst_len, &save_info); + if (ret < 0 || result->status != KAE_LZ4_SUCC) { result->status = KAE_LZ4_COMP_FAIL; return KAE_LZ4_COMP_FAIL; } idx++; - dst_len += ret; + save_info.dst_len += ret; seq_result = (void *)seq_result + KAE_LZ77_SEQ_DATA_SIZE_PER_64K; } @@ -1507,9 +1540,9 @@ int kaelz4_triples_rebuild_impl(const struct kaelz4_buffer_list *src, struct kae } if (result->obuf_crc != NULL) { - *result->obuf_crc = KAELZ4CRC32(*result->obuf_crc, dst->buf[0].data, dst_len); + *result->obuf_crc = KAELZ4CRC32(*result->obuf_crc, dst->buf[0].data, save_info.dst_len); } - result->dst_len = dst_len; + result->dst_len = save_info.dst_len; return KAE_LZ4_SUCC; } -- Gitee From d9a5ec94659d22364da95a39656619cff83010fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E5=87=A4?= Date: Tue, 22 Jul 2025 11:32:21 +0000 Subject: [PATCH 3/5] feat: deal kaezip dst buf error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 白凤 --- KAEZlib/include/kaezip.h | 1 + KAEZlib/src/kaezip_async_adapter.c | 2 +- KAEZlib/src/v1/kaezip_async_comp.c | 104 +++++++----------- KAEZlib/src/v1/kaezip_async_comp.h | 1 - KAEZlib/src/v1/wd_queue_memory.c | 37 +++++++ ...adk-support-sgl-zero-copy-for-kaelz4.patch | 85 +++++++------- 6 files changed, 126 insertions(+), 104 deletions(-) diff --git a/KAEZlib/include/kaezip.h b/KAEZlib/include/kaezip.h index 0f1f742..a0b813d 100644 --- a/KAEZlib/include/kaezip.h +++ b/KAEZlib/include/kaezip.h @@ -43,6 +43,7 @@ typedef struct { #define KAE_ZLIB_ALLOC_FAIL 5 #define KAE_ZLIB_SET_FAIL 6 #define KAE_ZLIB_HW_TIMEOUT_FAIL 7 +#define KAE_ZLIB_DST_BUF_OVERFLOW 8 struct kaezip_result { int status; diff --git a/KAEZlib/src/kaezip_async_adapter.c b/KAEZlib/src/kaezip_async_adapter.c index dde9ce5..edcefca 100644 --- a/KAEZlib/src/kaezip_async_adapter.c +++ b/KAEZlib/src/kaezip_async_adapter.c @@ -135,7 +135,7 @@ static int kaezip_check_param_valid(const struct kaezip_buffer_list *src, struct result->dst_len += dst->buf[i].buf_len; } - if (unlikely(src->buf_num > REQ_BUFFER_MAX || dst->buf_num > REQ_BUFFER_MAX)) { + if (unlikely(src->buf_num > REQ_BUFFER_MAX || src->buf_num <= 0 || dst->buf_num > REQ_BUFFER_MAX || dst->buf_num <= 0)) { return KAE_ZLIB_INVAL_PARA; } diff --git a/KAEZlib/src/v1/kaezip_async_comp.c b/KAEZlib/src/v1/kaezip_async_comp.c index dff1f9c..38f326e 100644 --- a/KAEZlib/src/v1/kaezip_async_comp.c +++ b/KAEZlib/src/v1/kaezip_async_comp.c @@ -223,6 +223,17 @@ static void kaezip_async_compress_cb(int status, void *param) kaezip_ctx_t* kz_ctx = req->kz_ctx; struct wcrypto_comp_op_data *op_data = &kz_ctx->op_data; + if (status != WCRYPTO_STATUS_NULL && status != WCRYPTO_NEGTIVE_COMP_ERR) { + if (status == WD_IN_EPARA) { + req->compress_ctx->status = KAE_ZLIB_DST_BUF_OVERFLOW; + } else { + req->compress_ctx->status = KAE_ZLIB_COMP_FAIL; + } + US_ERR("kaezip_async_compress_cb status %d !\n", status); + req->done = 1; + return; + } + kaezip_set_comp_status(kz_ctx); if (kz_ctx->status == KAEZIP_COMP_VERIFY_ERR) { US_ERR("kaezip_async_compress_cb status %d !\n", status); @@ -315,7 +326,7 @@ static void kaezip_find_and_free_kz_ctx(struct kaezip_async_ctrl *ctrl, kaezip_c static void kaezip_do_compress_polling(struct kaezip_async_ctrl *ctrl, struct kaezip_async_req *req) { - if (req->special_flag != 0 || req->kz_ctx == NULL) { + if (req->kz_ctx == NULL) { return; } @@ -423,10 +434,7 @@ int kaezip_async_compress_polling(struct kaezip_async_ctrl *ctrl, int budget) req->idx, req->src_size, compress_ctx->dstCapacity, req->last, ret, compress_ctx->status); } - if (!req->special_flag) { - ctrl->cur_num_in_comp--; - } - + ctrl->cur_num_in_comp--; ctrl->ctx_head = compress_ctx->next; kaezip_compress_async_callback(compress_ctx, compress_ctx->status); compress_ctx = ctrl->ctx_head; @@ -509,13 +517,7 @@ static kaezip_ctx_t *kaezip_async_init_ctx(struct kaezip_async_ctrl *ctrl, int c } (void)kaezip_async_compress_polling(ctrl, 1); - // 如果本线程已经idle,则使用之前已经申请到的kz_ctx - if (ctrl->cur_num_in_comp == 0 && ctrl->kz_ctx[0] != NULL) { - ctrl->ctx_index = 0; - kz_ctx = ctrl->kz_ctx[ctrl->ctx_index]; - } else { - kz_ctx = kaezip_init_v1(kaezip_get_win_size(), is_sgl, comp_optype); - } + kz_ctx = kaezip_init_v1(kaezip_get_win_size(), is_sgl, comp_optype); } ctrl->kz_ctx[ctrl->ctx_index] = kz_ctx; ctrl->kz_ctx[ctrl->ctx_index]->usr_map = ctrl->usr_map; @@ -536,8 +538,6 @@ static kaezip_ctx_t *kaezip_async_init_ctx(struct kaezip_async_ctrl *ctrl, int c kz_ctx = ctrl->kz_ctx[ctrl->ctx_index]; } - ctrl->ctx_index = (ctrl->ctx_index + 1) % MAX_NUM_IN_COMP; - ctrl->cur_num_in_comp++; return kz_ctx; } @@ -554,8 +554,6 @@ static int kaezip_send_async_compress(struct kaezip_async_ctrl *ctrl, struct kae int ret = kaezip_compress_async_impl(req->kz_ctx, &req->src, &req->dst, compress_size, dst_len, (void *)req); if (unlikely(ret != KAE_ZLIB_SUCC)) { kaezip_find_and_free_kz_ctx(ctrl, req->kz_ctx); - ctrl->ctx_index = (ctrl->ctx_index + MAX_NUM_IN_COMP - 1) % MAX_NUM_IN_COMP; - ctrl->cur_num_in_comp--; req->kz_ctx = NULL; US_ERR("Send compress cmd to kae hw failed! status %d\n", ret); return ret; @@ -601,7 +599,7 @@ static void kaezip_fill_hw_req_src_buf_list(struct kaezip_async_req *req, const } } -static int kaezip_async_compress_process(struct kaezip_async_ctrl *ctrl, void *arg, int comp_optype) +static void kaezip_async_compress_process(struct kaezip_async_ctrl *ctrl, void *arg, int comp_optype) { struct kaezip_compress_ctx *compress_ctx = arg; @@ -609,36 +607,31 @@ static int kaezip_async_compress_process(struct kaezip_async_ctrl *ctrl, void *a size_t srcSize = compress_ctx->srcSize; size_t remainingLength = srcSize; // 该值用于保存剩余的待压缩数据长度 + struct kaezip_async_req *req = &compress_ctx->req; + req->done = 0; + req->last = 1; + req->compress_ctx = compress_ctx; + req->next = NULL; + req->idx = 0; + compress_ctx->req_list = req; + if (unlikely(remainingLength == 0)) { + req->kz_ctx = NULL; + req->compress_ctx->status = KAE_ZLIB_COMP_FAIL; + req->done = 1; + return; + } + // 针对zlib的matchlength转换定义的数据结构 - int idx = 0; - while (remainingLength) { - struct kaezip_async_req *req = &compress_ctx->req; - req->idx = idx; - req->special_flag = 0; - req->last = 0; - req->done = 0; - req->compress_ctx = compress_ctx; - req->next = NULL; - kaezip_fill_hw_req_src_buf_list(req, compress_ctx->src); - kaezip_fill_hw_req_dst_buf_list(req, compress_ctx->dst); - remainingLength -= req->src_size; - // 最后一块实际下发给芯片的长度是 src_size - MFLIMIT - if (remainingLength == 0) { - req->last = 1; - } + kaezip_fill_hw_req_src_buf_list(req, compress_ctx->src); + kaezip_fill_hw_req_dst_buf_list(req, compress_ctx->dst); - int ret = KAE_ZLIB_SUCC; - ret = kaezip_send_async_compress(ctrl, req, comp_optype); - compress_ctx->req_list = req; - idx++; - if (ret != KAE_ZLIB_SUCC) { - req->compress_ctx->status = KAE_ZLIB_COMP_FAIL; - req->special_flag = 1; - req->done = 1; - } + int ret = KAE_ZLIB_SUCC; + ret = kaezip_send_async_compress(ctrl, req, comp_optype); + if (ret != KAE_ZLIB_SUCC) { + req->compress_ctx->status = KAE_ZLIB_COMP_FAIL; + req->done = 1; } - - return KAE_ZLIB_SUCC; + return; } static int kaezip_async_block_padding(struct kaezip_async_req *req, const struct wd_buf_list *source, @@ -659,9 +652,6 @@ int kaezip_compress_async(struct kaezip_async_ctrl *ctrl, const struct kaezip_bu kaezip_async_callback callback, struct kaezip_result *result, enum kaezip_async_data_format data_format, int comp_optype) { - if (result->src_size == 0) { - goto err_callback; - } struct kaezip_compress_ctx *compress_ctx = &ctrl->ctx[ctrl->ctx_index]; compress_ctx->dst = dst; @@ -687,23 +677,9 @@ int kaezip_compress_async(struct kaezip_async_ctrl *ctrl, const struct kaezip_bu } ctrl->tail = compress_ctx; - if (unlikely(kaezip_async_compress_process(ctrl, compress_ctx, comp_optype) != KAE_ZLIB_SUCC)) { - goto free_compress_ctx; - } + kaezip_async_compress_process(ctrl, compress_ctx, comp_optype); + ctrl->ctx_index = (ctrl->ctx_index + 1) % MAX_NUM_IN_COMP; + ctrl->cur_num_in_comp++; return KAE_ZLIB_SUCC; - -free_compress_ctx: - ctrl->ctx_head = compress_ctx->next; - if (ctrl->ctx_head == NULL) { - ctrl->tail = NULL; - } -err_callback: - if (ctrl->is_polling) { - return KAE_ZLIB_INVAL_PARA; - } - result->status = KAE_ZLIB_INVAL_PARA; - result->dst_len = 0; - callback(result); - return KAE_ZLIB_INVAL_PARA; } diff --git a/KAEZlib/src/v1/kaezip_async_comp.h b/KAEZlib/src/v1/kaezip_async_comp.h index 577ab7f..29f5af0 100644 --- a/KAEZlib/src/v1/kaezip_async_comp.h +++ b/KAEZlib/src/v1/kaezip_async_comp.h @@ -53,7 +53,6 @@ struct kaezip_async_req { size_t src_size; size_t dst_len; U32 idx; - U32 special_flag; U16 last; U16 buf_start_index; U32 done; diff --git a/KAEZlib/src/v1/wd_queue_memory.c b/KAEZlib/src/v1/wd_queue_memory.c index d98220d..240c631 100644 --- a/KAEZlib/src/v1/wd_queue_memory.c +++ b/KAEZlib/src/v1/wd_queue_memory.c @@ -30,6 +30,36 @@ void kaezip_wd_free_queue(struct wd_queue* queue); +#define ALL_NUMA_NODES 4 +static int g_numa_used_config_nodes[ALL_NUMA_NODES]; +static int g_numa_used_config_count = 0; + +/** + * 解析环境变量 KAE_ZIP_QUEUE_NODES_MASK 的 mask值。 + * 解析方法:十进制转换为2进制,根据每一位的值,确认是否使用对应NUMA。 + * 举例说明: + * export KAE_ZIP_QUEUE_NODES_MASK=15 // 十进制15 → 二进制 1111 → 使用NUMA 0,1,2,3 + * export KAE_ZIP_QUEUE_NODES_MASK=12 // 十进制12 → 二进制 0011 → 使用NUMA 2,3 + * export KAE_ZIP_QUEUE_NODES_MASK=3 // 十进制 3 → 二进制 0011 → 使用NUMA 0,1 + */ +static void parse_numa_env_mask(int mask) { + g_numa_used_config_count = 0; + for (int i = 0; i < ALL_NUMA_NODES; ++i) { + if (mask & (1 << i)) { + g_numa_used_config_nodes[g_numa_used_config_count++] = i; + } + } +} +static int get_numa_mask(int mask) { + if (g_numa_used_config_count == 0) { + parse_numa_env_mask(mask); + } + static int numa_id = 0; + int nid = g_numa_used_config_nodes[numa_id++ % g_numa_used_config_count]; + US_DEBUG("wd_queue use numa %d in this sess.\n", nid); + return 1 << nid; +} + struct wd_queue* kaezip_wd_new_queue(int comp_alg_type, int comp_optype, int is_sgl) { struct wd_queue* queue = (struct wd_queue *)kae_malloc(sizeof(struct wd_queue)); @@ -59,6 +89,13 @@ struct wd_queue* kaezip_wd_new_queue(int comp_alg_type, int comp_optype, int is_ if (is_sgl) queue->capa.priv.is_single_thread = 1; + char *queue_nodes = getenv("KAE_ZIP_QUEUE_NODES_MASK"); + if (queue_nodes != NULL) { + int queue_nodes_all = atoi(queue_nodes); + int numa_mask = get_numa_mask(queue_nodes_all); + queue->node_mask = numa_mask; + } + struct wcrypto_paras *priv = (struct wcrypto_paras *)&(queue->capa.priv); priv->direction = comp_optype; int ret = wd_request_queue(queue); diff --git a/scripts/patches/0008-uadk-support-sgl-zero-copy-for-kaelz4.patch b/scripts/patches/0008-uadk-support-sgl-zero-copy-for-kaelz4.patch index f956c7b..2cd428c 100644 --- a/scripts/patches/0008-uadk-support-sgl-zero-copy-for-kaelz4.patch +++ b/scripts/patches/0008-uadk-support-sgl-zero-copy-for-kaelz4.patch @@ -5,17 +5,17 @@ index 9b36ae9..0df1dfd 100644 @@ -41,6 +41,7 @@ pkginclude_HEADERS = include/wd.h include/wd_cipher.h include/wd_aead.h \ nobase_pkginclude_HEADERS = v1/wd.h v1/wd_cipher.h v1/wd_aead.h v1/uacce.h v1/wd_dh.h \ v1/wd_digest.h v1/wd_rsa.h v1/wd_bmm.h - + +nobase_pkginclude_HEADERS += v1/wd_sgl.h lib_LTLIBRARIES=libwd.la libwd_comp.la libwd_crypto.la libwd_dae.la - + uadk_driversdir=$(libdir)/uadk diff --git a/uadk/v1/drv/hisi_qm_udrv.c b/uadk/v1/drv/hisi_qm_udrv.c -index 7b0183b..4959aa3 100644 +index 7b0183b..224aadb 100644 --- a/uadk/v1/drv/hisi_qm_udrv.c +++ b/uadk/v1/drv/hisi_qm_udrv.c @@ -53,11 +53,11 @@ static int qm_hw_sgl_info(struct hw_sgl_info *sgl_info) - + /* 'num' starts from 1 */ static int qm_hw_sgl_sge_init(struct wd_sgl *sgl, struct hisi_sgl *hisi_sgl, - struct wd_mm_br *br, int num, __u32 buf_sz) @@ -28,17 +28,17 @@ index 7b0183b..4959aa3 100644 + buf = sgl->sge[num - 1].buf; if (!buf) return -WD_EINVAL; - + @@ -69,16 +69,17 @@ static int qm_hw_sgl_sge_init(struct wd_sgl *sgl, struct hisi_sgl *hisi_sgl, } - + hisi_sgl->sge_entries[num - 1].len = buf_sz; - drv_set_sgl_sge_pri(sgl, num - 1, &hisi_sgl->sge_entries[num - 1]); + sgl->sge[num - 1].priv = &hisi_sgl->sge_entries[num - 1]; - + return WD_SUCCESS; } - + /* 'num' starts from 1 */ static void qm_hw_sgl_sge_uninit(struct wd_sgl *sgl, struct hisi_sgl *hisi_sgl, - int num, struct wd_mm_br *br, __u32 buf_sz) @@ -46,11 +46,11 @@ index 7b0183b..4959aa3 100644 { void *buf; + __u32 buf_sz = sgl->sge[num - 1].data_len; - + buf = wd_get_sge_buf(sgl, num); if (!buf) @@ -90,24 +91,17 @@ static void qm_hw_sgl_sge_uninit(struct wd_sgl *sgl, struct hisi_sgl *hisi_sgl, - + static int qm_hw_sgl_init(void *pool, struct wd_sgl *sgl) { - int buf_num = wd_get_sgl_buf_num(sgl); @@ -61,12 +61,12 @@ index 7b0183b..4959aa3 100644 struct wd_mm_br *br; int i, j, ret; - __u32 buf_sz; - + if (!pool || buf_num < 0 || sge_num < 0) { WD_ERR("hw_sgl_init init param err!\n"); return -WD_EINVAL; } - + - ret = wd_get_sgl_bufsize(sgl, &buf_sz); - if (ret) { - WD_ERR("failed to get sgl bufsize!\n"); @@ -78,7 +78,7 @@ index 7b0183b..4959aa3 100644 br = drv_get_br(pool); @@ -123,7 +117,7 @@ static int qm_hw_sgl_init(void *pool, struct wd_sgl *sgl) hisi_sgl->next_dma = 0; - + for (i = 0; i < buf_num; i++) { - ret = qm_hw_sgl_sge_init(sgl, hisi_sgl, br, i + 1, buf_sz); + ret = qm_hw_sgl_sge_init(sgl, hisi_sgl, br, i + 1); @@ -88,22 +88,22 @@ index 7b0183b..4959aa3 100644 @@ -136,12 +130,12 @@ static int qm_hw_sgl_init(void *pool, struct wd_sgl *sgl) drv_set_sgl_sge_pri(sgl, i, &hisi_sgl->sge_entries[i]); } - + - drv_set_sgl_pri(sgl, hisi_sgl); + sgl->priv = hisi_sgl; return WD_SUCCESS; - + sgl_sge_init_err: for (j = i - 1; j >= 0; j--) - qm_hw_sgl_sge_uninit(sgl, hisi_sgl, j + 1, br, buf_sz); + qm_hw_sgl_sge_uninit(sgl, hisi_sgl, j + 1, br); - + br->free(br->usr, hisi_sgl); - + @@ -609,10 +603,15 @@ int qm_send(struct wd_queue *q, void **req, __u32 num) int ret; __u32 i; - + - wd_fair_lock(&info->sd_lock); + if (!q->capa.priv.is_single_thread) { + wd_fair_lock(&info->sd_lock); @@ -132,18 +132,27 @@ index 7b0183b..4959aa3 100644 } @@ -633,7 +634,8 @@ int qm_send(struct wd_queue *q, void **req, __u32 num) } - + ret = qm_tx_update(info, num); - wd_fair_unlock(&info->sd_lock); + if (!q->capa.priv.is_single_thread) + wd_fair_unlock(&info->sd_lock); - + return ret; } +@@ -686,7 +688,7 @@ int qm_rx_update(struct qm_queue_info *info, __u32 num) + return ret; + + /* make sure queue status check is complete. */ +- rmb(); ++ // rmb(); + + /* set c_flag to enable interrupt when use poll */ + info->db(info, DOORBELL_CMD_CQ, info->cq_head_index, info->is_poll); @@ -709,7 +711,9 @@ int qm_recv(struct wd_queue *q, void **resp, __u32 num) if (unlikely(ret)) return ret; - + - wd_fair_lock(&info->rc_lock); + if (!q->capa.priv.is_single_thread) + wd_fair_lock(&info->rc_lock); @@ -176,11 +185,11 @@ index 7b0183b..4959aa3 100644 @@ -751,7 +759,8 @@ int qm_recv(struct wd_queue *q, void **resp, __u32 num) ret = i; } - + - wd_fair_unlock(&info->rc_lock); + if (!q->capa.priv.is_single_thread) + wd_fair_unlock(&info->rc_lock); - + return ret; } diff --git a/uadk/v1/libwd.map b/uadk/v1/libwd.map @@ -189,7 +198,7 @@ index d53201b..a43a884 100644 +++ b/uadk/v1/libwd.map @@ -139,6 +139,8 @@ global: wcrypto_rng_poll; - + wd_sglpool_create; + wd_build_sgl; + wd_destory_sgl; @@ -205,11 +214,11 @@ index 4618a8c..51a2061 100644 __u8 direction; __u8 is_poll; + __u8 is_single_thread; - + /* to be extended */ }; diff --git a/uadk/v1/wd_comp.c b/uadk/v1/wd_comp.c -index 169f1b4..87393ae 100644 +index 169f1b4..209fab0 100644 --- a/uadk/v1/wd_comp.c +++ b/uadk/v1/wd_comp.c @@ -253,16 +253,22 @@ int wcrypto_do_comp(void *ctx, struct wcrypto_comp_op_data *opdata, void *tag) @@ -218,12 +227,12 @@ index 169f1b4..87393ae 100644 __u64 recv_count = 0; - int ret; + int ret = 0; - + if (unlikely(!ctx || !opdata || !opdata->in || !opdata->out)) { WD_ERR("invalid: comp input parameter err!\n"); return -EINVAL; } - + - ret = wd_get_cookies(&cctx->pool, (void **)&cookie, 1); - if (ret) - return ret; @@ -236,17 +245,17 @@ index 169f1b4..87393ae 100644 + } else { + cookie = opdata->cookie; + } - + msg = &cookie->msg; if (tag) { @@ -314,6 +320,7 @@ int wcrypto_do_comp(void *ctx, struct wcrypto_comp_op_data *opdata, void *tag) - + err_put_cookie: wd_put_cookies(&cctx->pool, (void **)&cookie, 1); + opdata->cookie = NULL; return ret; } - + @@ -355,7 +362,9 @@ int wcrypto_comp_poll(struct wd_queue *q, unsigned int num) tag = (void *)(uintptr_t)resp->udata; ctx = tag->wcrypto_tag.ctx; @@ -257,7 +266,7 @@ index 169f1b4..87393ae 100644 + resp = NULL; } while (--tmp); - + diff --git a/uadk/v1/wd_comp.h b/uadk/v1/wd_comp.h index 4c84ea3..8c0d9e5 100644 --- a/uadk/v1/wd_comp.h @@ -268,7 +277,7 @@ index 4c84ea3..8c0d9e5 100644 void *priv; + void *cookie; }; - + struct wcrypto_comp_msg { diff --git a/uadk/v1/wd_sgl.c b/uadk/v1/wd_sgl.c index cb3b8ee..fc1f329 100644 @@ -277,7 +286,7 @@ index cb3b8ee..fc1f329 100644 @@ -44,30 +44,6 @@ #define ALIGN_SIZE_MAX 0x800 #define ALIGN_SIZE 64 - + -struct wd_sge { - /* 'priv' is used by driver, which may be a hardware sgl address */ - void *priv; @@ -311,7 +320,7 @@ index cb3b8ee..fc1f329 100644 sgl_sge_init(sgl_blk[i], j, buf); + sgl_blk[i]->sge[j].data_len = sp->buf_size; } - + ret = drv_init_sgl(q, pool, sgl_blk[i]); @@ -1029,3 +1006,67 @@ void wd_sgl_memset(struct wd_sgl *sgl, int ch) for (i = 0; i < sgl->buf_num; i++) @@ -382,13 +391,13 @@ index cb3b8ee..fc1f329 100644 + return drv_uninit_sgl(q, pool, sgl); +} diff --git a/uadk/v1/wd_sgl.h b/uadk/v1/wd_sgl.h -index e2e82f2..44cbfb3 100644 +index e2e82f2..e254daa 100644 --- a/uadk/v1/wd_sgl.h +++ b/uadk/v1/wd_sgl.h -@@ -43,7 +45,45 @@ struct wd_sglpool_setup { +@@ -43,7 +43,45 @@ struct wd_sglpool_setup { struct wd_mm_br br; }; - + +struct wd_buf { + size_t buf_len; + void *data; -- Gitee From 40d7f68539eb736534159c191792604fb34e94b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E5=87=A4?= Date: Tue, 22 Jul 2025 11:33:31 +0000 Subject: [PATCH 4/5] test: add dst error testing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 白凤 --- KAELz4/test/kzip/README.md | 22 +- KAELz4/test/kzip/scene_test_functions/entry.c | 4 + .../test_async_dst_buf_less_deflate.c | 447 ++++++++++++++++++ .../test_async_dst_buf_less_lz77_raw.c | 437 +++++++++++++++++ .../scene_test_functions/test_async_sgl.c | 11 +- KAEZlib/README.md | 16 +- 6 files changed, 921 insertions(+), 16 deletions(-) create mode 100644 KAELz4/test/kzip/scene_test_functions/test_async_dst_buf_less_deflate.c create mode 100644 KAELz4/test/kzip/scene_test_functions/test_async_dst_buf_less_lz77_raw.c diff --git a/KAELz4/test/kzip/README.md b/KAELz4/test/kzip/README.md index 0081771..361f799 100644 --- a/KAELz4/test/kzip/README.md +++ b/KAELz4/test/kzip/README.md @@ -119,8 +119,9 @@ export KAE_LZ4_ASYNC_THREAD_NUM=8 sh runPerf.sh -A kaelz4async_frame -m 1 -n 20000 -s [4/8/16/32/64] -r 1 -k 1 -i 64 -p 0 -f [path to calgary.tar] ``` + zlib下deflate_raw格式异步压缩接口测试: -``` +```shell # 1、单IO时延测试:等价串行流程,结果表示单个IO的压缩时延。 sh runPerf.sh -A kaezlibasync_deflate -m 1 -n 20000 -s [4/8/16/32/64] -r 1 -k 1 -i 1 -p 1 -f [path to calgary.tar] @@ -131,8 +132,25 @@ sh runPerf.sh -A kaezlibasync_deflate -m 1 -n 20000 -s [4/8/16/32/64] -r 1 -k 1 sh runPerf.sh -A kaezlibasync_deflate -m 1 -n 20000 -s [4/8/16/32/64] -r 1 -k 1 -i 8 -p 1 -f [path to calgary.tar] ``` +zlib下deflate_raw格式单个进程同时使用多个KAE的测试: +```shell +# 设置环境变量,以使用多个KAE。 +# 注意:跨numa使用KAE会影响性能。达到最优性能推荐当前进程使用自身所处CPU节点对应的numa上的KAE。 +# 以下测试命令将进程绑定到numa0上,并同时使用numa0和numa1对应的KAE。 +export KAE_ZIP_QUEUE_NODES_MASK=3 # 使用NUMA 0,1 +# 双KAE压缩解压能力测试 +sh runPerf.sh -A kaezlibasync_deflate -m 1 -n 20000 -s [4/8/16/32/64] -k 1 -i 64 -p 1 -e 2 -f [path to calgary.tar] + +# 环境变量 KAE_ZIP_QUEUE_NODES_MASK 的使用说明: +# export KAE_ZIP_QUEUE_NODES_MASK=15 # 十进制15 → 二进制 1111 → 使用NUMA 0,1,2,3 +# export KAE_ZIP_QUEUE_NODES_MASK=12 # 十进制12 → 二进制 0011 → 使用NUMA 2,3 +# export KAE_ZIP_QUEUE_NODES_MASK=11 # 十进制11 → 二进制 1011 → 使用NUMA 0,1,3 +# export KAE_ZIP_QUEUE_NODES_MASK=7 # 十进制7 → 二进制 0111 → 使用NUMA 0,1,2 +# export KAE_ZIP_QUEUE_NODES_MASK=5 # 十进制5 → 二进制 0101 → 使用NUMA 0,3 +``` + ``` # 单一场景接口组合使用demo测试 -export LD_LIBRARY_PATH=/usr/local/kaelz4/lib/:$LD_LIBRARY_PATH +export LD_LIBRARY_PATH=/usr/local/kaezip/lib/:/usr/local/kaelz4/lib/:$LD_LIBRARY_PATH ./kzip -T 1 ``` \ No newline at end of file diff --git a/KAELz4/test/kzip/scene_test_functions/entry.c b/KAELz4/test/kzip/scene_test_functions/entry.c index b634ed9..61d3f06 100644 --- a/KAELz4/test/kzip/scene_test_functions/entry.c +++ b/KAELz4/test/kzip/scene_test_functions/entry.c @@ -10,6 +10,8 @@ void test_async_polling_interface(); void test_async_SGL_data(); void test_async_lz77_raw(); void test_async_zlib_deflate_raw(); +void test_async_err_dst_buf_less_deflate(); +void test_async_err_dst_buf_less_lz77_raw(); typedef struct { const char *test_name; @@ -26,6 +28,8 @@ test_case_t test_cases[] = { {"异步SGL模式测试", test_async_SGL_data}, {"异步polling模式 lz77_raw数据后处理接口测试", test_async_lz77_raw}, {"异步zlib接口 deflate_raw数据测试", test_async_zlib_deflate_raw}, + {"异步zlib接口dest空间不足时重试测试", test_async_err_dst_buf_less_deflate}, + {"异步lz4 接口lz77_raw 格式dest空间不足时重试测试", test_async_err_dst_buf_less_lz77_raw}, }; #define NUM_TESTS (sizeof(test_cases) / sizeof(test_case_t)) diff --git a/KAELz4/test/kzip/scene_test_functions/test_async_dst_buf_less_deflate.c b/KAELz4/test/kzip/scene_test_functions/test_async_dst_buf_less_deflate.c new file mode 100644 index 0000000..4b70cdb --- /dev/null +++ b/KAELz4/test/kzip/scene_test_functions/test_async_dst_buf_less_deflate.c @@ -0,0 +1,447 @@ +#include +#include +#include +#include +#include +#include + +#include // for Bytef +#include // for O_RDONLY and open +#include // for munmap +#include + +#include "kaezip.h" + +#define HPAGE_SIZE (2 * 1024 * 1024) // 2MB大页 +#define PAGE_SHIFT 12 +#define PAGE_SIZE (1UL << PAGE_SHIFT) +#define PFN_MASK ((1UL << 55) - 1) + +#define TEST_FILE_PATH "../../../scripts/compressTestDataset/calgary" + +static int g_has_done = 0; // 异步回调是否完成。需要初始化为0。 +static int g_inflate_type = 0; // 是否使用zlib异步解压。0:同步解压;1:异步解压。 + +struct my_custom_data { + void *src; + void *dst; + struct kaezip_buffer_list src_list; + struct kaezip_buffer_list dest_list; + void *src_decompd; + struct kaezip_buffer_list src_decompd_list; + size_t src_len; + size_t dst_len; + size_t src_decompd_len; +}; +struct cache_page_map { + uint64_t *entries; + size_t entries_num; + void *base_vaddr; +}; + +static struct cache_page_map* init_cache_page_map(void *base_vaddr, size_t total_size) +{ + struct cache_page_map *cache = malloc(sizeof(struct cache_page_map)); + if (!cache) return NULL; + + int fd = open("/proc/self/pagemap", O_RDONLY); + if (fd < 0) { + perror("打开/proc/self/pagemap失败"); + free(cache); + return NULL; + } + + // 根据申请大小计算需要读取的条目数 + size_t pages_num = total_size / PAGE_SIZE; + cache->entries_num = pages_num; + + cache->base_vaddr = base_vaddr; + + // 分配缓存空间 + cache->entries = malloc(pages_num * sizeof(uint64_t)); + if (!cache->entries) { + close(fd); + free(cache); + return NULL; + } + + // 计算文件偏移量(基地址为第一个条目,即申请到的虚拟地址对应的页面) + uintptr_t base = (uintptr_t)base_vaddr; + uintptr_t first_offset = (base / PAGE_SIZE) * sizeof(uint64_t); + + // 定位到起始位置 + if (lseek(fd, first_offset, SEEK_SET) != first_offset) { + perror("lseek失败"); + close(fd); + free(cache->entries); + free(cache); + return NULL; + } + + // 读取该次申请到的所有条目 + if (read(fd, cache->entries, pages_num * sizeof(uint64_t)) != (ssize_t)(pages_num * sizeof(uint64_t))) { + perror("读取条目失败"); + close(fd); + free(cache->entries); + free(cache); + return NULL; + } + close(fd); + return cache; +} + +#define MAP_HUGE_1GB (30 << MAP_HUGE_SHIFT) +static void *get_huge_pages(size_t total_size) +{ + void *addr = mmap( + NULL, + total_size, + PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB | MAP_HUGE_1GB, + -1, 0 + ); // 申请内存大页 + + if (addr == MAP_FAILED) { + fprintf(stderr, "申请内存大页失败。\n"); + fprintf(stderr, "系统可能没有足够的大页可用。\n"); + fprintf(stderr, "请尝试分配更多大页: sudo sysctl vm.nr_hugepages=10000\n"); + exit(EXIT_FAILURE); + } + + return addr; +} + +static uint64_t get_physical_address_cache_page_map(struct cache_page_map *cache, void *vaddr) { + uintptr_t virtual_addr = (uintptr_t)vaddr; + + // 计算在缓存中的条目索引 + uintptr_t base = (uintptr_t)cache->base_vaddr; + uintptr_t index = (virtual_addr - base) / PAGE_SIZE; + + // printf("uintptr_t index = %ld . entries_num = %ld \n", index, cache->entries_num); + if (index >= cache->entries_num) { + fprintf(stderr, "地址超出缓存范围\n"); + return 0; + } + + uint64_t entry = cache->entries[index]; + + if (!(entry & (1ULL << 63))) { + fprintf(stderr, "页面不存在\n"); + return 0; + } + + // 提取物理帧号(PFN) + uint64_t pfn = entry & PFN_MASK; + return (pfn << PAGE_SHIFT) | (virtual_addr & (PAGE_SIZE - 1)); +} + +static void* get_physical_address_wrapper(void *usr, void *vaddr, size_t sz) +{ + struct cache_page_map *cache = (struct cache_page_map *)usr; + uint64_t phys_addr = get_physical_address_cache_page_map(cache, vaddr); + return (void*)(uintptr_t)phys_addr; +} + +// 给tuple_buf分配 input_size 大小的真实物理内存,用于存放压缩前的file数据。 或者用于存放压缩后的数据。 +static int prepare_physical_buf(void **tuple_buf, size_t input_size, struct cache_page_map** page_cache, FILE* file) +{ + int huge_page_num = (int)(input_size * sizeof(Bytef) / HPAGE_SIZE) + 1; // 大页大小为2M,申请大页时申请大小需为大页大小的整数倍 + size_t total_size = huge_page_num * HPAGE_SIZE; + + *tuple_buf = get_huge_pages(total_size); + if (*tuple_buf == NULL) { + return -1; + } + if(file == NULL) { + memset(*tuple_buf, 0, total_size); + } else { + (void)fread(*tuple_buf, 1, input_size, file); + } + + struct cache_page_map* cache = init_cache_page_map(*tuple_buf, total_size); + if (cache == NULL) { + printf("init_cache_page_map failed\n"); + return -1; + } + *page_cache = cache; + + return 0; +} + +static void *g_page_info = NULL; +static size_t read_inputFile(const char* fileName, void** input) +{ + FILE* sourceFile = fopen(fileName, "r"); + if (sourceFile == NULL) { + fprintf(stderr, "%s not exist!\n", fileName); + return 0; + } + int fd = fileno(sourceFile); + struct stat fs; + (void)fstat(fd, &fs); + size_t input_size = fs.st_size; + + prepare_physical_buf(input, input_size, (struct cache_page_map**)&g_page_info, sourceFile); + + fclose(sourceFile); + + return input_size; +} + +static void release_huge_pages(void *addr, size_t total_size) +{ + munmap(addr, total_size); +} + +static void check_results(struct my_custom_data *my_data) { + if (my_data->src_decompd_len != my_data->src_len) { + printf("Test Error: 解压后与原始长度不一样. 原始长度=%ld 压缩后再解压得到长度=%ld \n", + my_data->src_len, + my_data->src_decompd_len); + } + + // 比较解压后的数据和原始数据 + if (memcmp(my_data->src_decompd, my_data->src_list.buf[0].data, my_data->src_decompd_len) == 0) { + if(g_inflate_type == 0) { + printf("Test Success for zlib deflate raw with async deflate and sync inflate.\n"); + } else { + printf("Test Success for zlib deflate raw wuth async deflate and async inflate.\n"); + } + } else { + printf("Test Error:Decompressed data does not match the original data.\n"); + } +} + +static void decompression_callback3(struct kaezip_result *result) +{ + if (result->status != 0) { + printf("DeCompression failed with status: %d\n", result->status); + return; + } + // 在回调中获取解压的数据 + struct my_custom_data *my_data = (struct my_custom_data *)result->user_data; + void *compressed_data = my_data->src_decompd_list.buf[0].data; + my_data->src_decompd_len = result->dst_len; + + my_data->src_decompd = compressed_data; + + check_results(my_data); + + g_has_done = 1; +} +static int decompressAsync(struct my_custom_data *mydata) +{ + iova_map_fn usr_map = get_physical_address_wrapper; + void *desess = KAEZIP_create_async_decompress_session(usr_map); + + // 使用真实的压缩数据长度作为解压时的输入长度。 + mydata->dest_list.buf[0].buf_len = mydata->dst_len; + + // 提供超出原始数据大小的缓冲区,以确保解压时数据不会溢出 + size_t tmp_size = mydata->src_len * 2; + void *tmp_buf = NULL; + struct cache_page_map *tmp_page_info = {0}; + prepare_physical_buf(&tmp_buf, tmp_size, &tmp_page_info, NULL); + struct kaezip_buffer src_decomped_buf_array[128]; + mydata->src_decompd_list.buf_num = 1; + mydata->src_decompd_list.buf = src_decomped_buf_array; + mydata->src_decompd_list.buf[0].data = tmp_buf; + mydata->src_decompd_list.buf[0].buf_len = tmp_size; + mydata->src_decompd_list.usr_data = tmp_page_info; + + // 异步解压结果 + struct kaezip_result result = {0}; + result.user_data = mydata; + + // 将 mydata->dest_list.buf[0].data 中的数据进行解压,解压结果放在 mydata->src_decompd_list.buf[0].data 中。 + int compression_status = KAEZIP_decompress_async_in_session(desess, &mydata->dest_list, &mydata->src_decompd_list, + decompression_callback3, &result); + + if (compression_status != 0) { + printf("deCompression failed with error code: %d\n", compression_status); + release_huge_pages(tmp_buf, tmp_size); + return -1; + } + while (g_has_done != 1) { + KAEZIP_compress_async_polling_in_session(desess, 1); + usleep(100); + } + KAEZIP_destroy_async_decompress_session(desess); + release_huge_pages(tmp_buf, tmp_size); + return compression_status; +} + +static int retry_compression(struct my_custom_data *my_data); +static int g_total_trytimes = 0; +static void compression_callback3(struct kaezip_result *result) +{ + struct my_custom_data *my_data = (struct my_custom_data *)result->user_data; + + if (result->status == 8) { + if (g_total_trytimes > 6) { + printf("尝试多次失败,退出\n"); + return; + } + g_total_trytimes++; + printf("Compression callback failed with status: %d , retrying...\n", result->status); + + // 清理旧的压缩目标缓存 + struct kaezip_buffer *buf = &my_data->dest_list.buf[0]; + size_t old_size = buf->buf_len; + release_huge_pages(buf->data, old_size); + + // 重新分配更大的缓冲区 + size_t new_size = old_size * 2; + void *new_buf = NULL; + struct cache_page_map *new_page_info = NULL; + prepare_physical_buf(&new_buf, new_size, &new_page_info, NULL); + + buf->data = new_buf; + buf->buf_len = new_size; + my_data->dest_list.usr_data = new_page_info; + + // 重试压缩 + retry_compression(my_data); + return; + } + if (result->status != 0) { + printf("Compression failed with status: %d\n", result->status); + return; + } + // 在回调中获取压缩后的数据 + size_t compressed_size = result->dst_len; + void *compressed_data = my_data->dest_list.buf[0].data; + my_data->dst_len = compressed_size; + + if (g_inflate_type == 0) { // 同步解压。 + // 为解压数据分配内存 + size_t tmp_src_len = result->src_size * 10; + void *dst_buffer = malloc(tmp_src_len); + if (!dst_buffer) { + printf("Memory allocation failed for decompressed data.\n"); + return; + } + + int ret = -1; + z_stream strm; + strm.zalloc = (alloc_func)0; + strm.zfree = (free_func)0; + strm.opaque = (voidpf)0; + (void)inflateInit2_(&strm, -15, "1.2.11", sizeof(z_stream)); + strm.next_in = (z_const Bytef *)compressed_data; + strm.next_out = dst_buffer; + strm.avail_in = compressed_size; + strm.avail_out = tmp_src_len; + ret = inflate(&strm, Z_FINISH); + + tmp_src_len = strm.total_out; + // inflateReset(&strm); + (void)inflateEnd(&strm); + if (ret < Z_OK) { + printf("[KAE_ERR]:uncompress2 failed, ret is:%d.\n", ret); + } + + if (ret < 0) { + printf("Decompression failed with error code: %d\n", ret); + free(dst_buffer); + return; + } + my_data->src_decompd = dst_buffer; + my_data->src_decompd_len = tmp_src_len; + + check_results(my_data); + // 释放解压后的数据 + free(dst_buffer); + g_has_done = 1; + } else { + decompressAsync(my_data); + } +} +static int retry_compression(struct my_custom_data *my_data) +{ + iova_map_fn usr_map = get_physical_address_wrapper; + void *sess = KAEZIP_create_async_compress_session(usr_map); + + struct kaezip_result result = {0}; + result.user_data = my_data; + g_has_done = 0; + + int status = KAEZIP_compress_async_in_session(sess, &my_data->src_list, &my_data->dest_list, + compression_callback3, &result); + if (status != 0) { + printf("Retry compression failed with status: %d\n", status); + return -1; + } + while (g_has_done != 1) { + KAEZIP_compress_async_polling_in_session(sess, 1); + usleep(100); + } + KAEZIP_destroy_async_compress_session(sess); + return 0; +} + +static int test_main() +{ + g_has_done = 0; + size_t src_len = 0; + void *inbuf = NULL; + + src_len = read_inputFile(TEST_FILE_PATH, &inbuf); + + iova_map_fn usr_map = get_physical_address_wrapper; + void *sess = KAEZIP_create_async_compress_session(usr_map); + + // 异步压缩 + struct kaezip_result result = {0}; + struct my_custom_data mydata = {0}; + + struct kaezip_buffer src_buf[128]; + mydata.src_list.usr_data = g_page_info; + mydata.src_list.buf_num = 1; + mydata.src_list.buf = src_buf; + mydata.src_list.buf[0].data = inbuf; + mydata.src_list.buf[0].buf_len = src_len; + + size_t compressed_size = compressBound(src_len); + compressed_size = compressed_size / 15; // 分配一个不足的dest空间,测试异常返回值 + void *tuple_buf = NULL; + struct cache_page_map *tuple_page_info = {0}; + prepare_physical_buf(&tuple_buf, compressed_size, &tuple_page_info, NULL); + struct kaezip_buffer tuple_buf_array[128]; + mydata.dest_list.buf_num = 1; + mydata.dest_list.buf = tuple_buf_array; + mydata.dest_list.buf[0].data = tuple_buf; + mydata.dest_list.buf[0].buf_len = compressed_size; + mydata.dest_list.usr_data = tuple_page_info; + + mydata.src_len = src_len; + result.user_data = &mydata; + + // 将 my_data->src_list.buf[0].data 中的数据压缩,压缩结果放在 my_data->dest_list.buf[0].data 中。 + int compression_status = KAEZIP_compress_async_in_session(sess, &mydata.src_list, &mydata.dest_list, + compression_callback3, &result); + + if (compression_status != 0) { + printf("Compression failed with error code: %d\n", compression_status); + release_huge_pages(inbuf, src_len); + return -1; + } + + while (g_has_done != 1) { + KAEZIP_compress_async_polling_in_session(sess, 1); + usleep(100); + } + KAEZIP_destroy_async_compress_session(sess); + + release_huge_pages(tuple_buf, src_len); + + return compression_status; +} +int test_async_err_dst_buf_less_deflate() +{ + int ret = test_main(); // 测试异步压缩 -> 同步解压。 + g_inflate_type = 1; // 测试异步压缩 -> 异步解压。 + ret = test_main(); + return ret; +} diff --git a/KAELz4/test/kzip/scene_test_functions/test_async_dst_buf_less_lz77_raw.c b/KAELz4/test/kzip/scene_test_functions/test_async_dst_buf_less_lz77_raw.c new file mode 100644 index 0000000..4e44d2c --- /dev/null +++ b/KAELz4/test/kzip/scene_test_functions/test_async_dst_buf_less_lz77_raw.c @@ -0,0 +1,437 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include // for Bytef +#include // for O_RDONLY and open +#include // for munmap +#include + +#define HPAGE_SIZE (2 * 1024 * 1024) // 2MB大页 +#define PAGE_SHIFT 12 +#define PAGE_SIZE (1UL << PAGE_SHIFT) +#define PFN_MASK ((1UL << 55) - 1) + +static int g_has_done = 0; // 异步回调是否完成。需要初始化为0。 +static int g_file_chunk_size = 256; +static int g_test_frame = 0; // 是否测试frame格式。 +struct my_custom_data { + void *src; + void *tuple; + void *dst; + struct kaelz4_buffer_list src_list; + struct kaelz4_buffer_list tuple_list; + struct kaelz4_buffer_list dst_list; + void *src_decompd; + size_t src_len; + size_t tuple_len; + size_t dst_len; + size_t src_decompd_len; +}; +struct cache_page_map { + uint64_t *entries; + size_t entries_num; + void *base_vaddr; +}; + +static struct cache_page_map* init_cache_page_map(void *base_vaddr, size_t total_size) +{ + struct cache_page_map *cache = malloc(sizeof(struct cache_page_map)); + if (!cache) return NULL; + + int fd = open("/proc/self/pagemap", O_RDONLY); + if (fd < 0) { + perror("打开/proc/self/pagemap失败"); + free(cache); + return NULL; + } + + // 根据申请大小计算需要读取的条目数 + size_t pages_num = total_size / PAGE_SIZE; + cache->entries_num = pages_num; + + cache->base_vaddr = base_vaddr; + + // 分配缓存空间 + cache->entries = malloc(pages_num * sizeof(uint64_t)); + if (!cache->entries) { + close(fd); + free(cache); + return NULL; + } + + // 计算文件偏移量(基地址为第一个条目,即申请到的虚拟地址对应的页面) + uintptr_t base = (uintptr_t)base_vaddr; + uintptr_t first_offset = (base / PAGE_SIZE) * sizeof(uint64_t); + + // 定位到起始位置 + if (lseek(fd, first_offset, SEEK_SET) != first_offset) { + perror("lseek失败"); + close(fd); + free(cache->entries); + free(cache); + return NULL; + } + + // 读取该次申请到的所有条目 + if (read(fd, cache->entries, pages_num * sizeof(uint64_t)) != (ssize_t)(pages_num * sizeof(uint64_t))) { + perror("读取条目失败"); + close(fd); + free(cache->entries); + free(cache); + return NULL; + } + close(fd); + return cache; +} + +static void *get_huge_pages(size_t total_size) +{ + void *addr = mmap( + NULL, + total_size, + PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB, + -1, 0 + ); // 申请内存大页 + + if (addr == MAP_FAILED) { + fprintf(stderr, "申请内存大页失败。\n"); + fprintf(stderr, "系统可能没有足够的大页可用。\n"); + fprintf(stderr, "请尝试分配更多大页: sudo sysctl vm.nr_hugepages=10000\n"); + exit(EXIT_FAILURE); + } + + return addr; +} + +static uint64_t get_physical_address_cache_page_map(struct cache_page_map *cache, void *vaddr) { + uintptr_t virtual_addr = (uintptr_t)vaddr; + + // 计算在缓存中的条目索引 + uintptr_t base = (uintptr_t)cache->base_vaddr; + uintptr_t index = (virtual_addr - base) / PAGE_SIZE; + + // printf("uintptr_t index = %ld . entries_num = %ld \n", index, cache->entries_num); + if (index >= cache->entries_num) { + fprintf(stderr, "地址超出缓存范围\n"); + return 0; + } + + uint64_t entry = cache->entries[index]; + + if (!(entry & (1ULL << 63))) { + fprintf(stderr, "页面不存在\n"); + return 0; + } + + // 提取物理帧号(PFN) + uint64_t pfn = entry & PFN_MASK; + return (pfn << PAGE_SHIFT) | (virtual_addr & (PAGE_SIZE - 1)); +} + +static void* get_physical_address_wrapper(void *usr, void *vaddr, size_t sz) +{ + struct cache_page_map *cache = (struct cache_page_map *)usr; + uint64_t phys_addr = get_physical_address_cache_page_map(cache, vaddr); + return (void*)(uintptr_t)phys_addr; +} + + +static void *g_page_info = NULL; +static size_t read_inputFile(const char* fileName, void** input) +{ + FILE* sourceFile = fopen(fileName, "r"); + if (sourceFile == NULL) { + fprintf(stderr, "%s not exist!\n", fileName); + return 0; + } + int fd = fileno(sourceFile); + struct stat fs; + (void)fstat(fd, &fs); + size_t input_size = fs.st_size; + input_size = 63 * 1024; // 小于64k的测试,KAE侧逻辑比较清晰。 + + int huge_page_num = (int)(input_size * sizeof(Bytef) / HPAGE_SIZE) + 1; // 大页大小为2M,申请大页时申请大小需为大页大小的整数倍 + size_t total_size = huge_page_num * HPAGE_SIZE; + *input = get_huge_pages(total_size); + + if (*input == NULL) { + return 0; + } + (void)fread(*input, 1, input_size, sourceFile); + + struct cache_page_map* cache = init_cache_page_map(*input, total_size); + + // printf("初始化数据 %ld \n", cache->entries_num); + // uint64_t phys_addr = get_physical_address_cache_page_map(cache, *input); + + // printf("大页物理地址: 0x%" PRIx64 "\n", phys_addr); + g_page_info = cache; + fclose(sourceFile); + + return input_size; +} + +static void release_huge_pages(void *addr, size_t total_size) +{ + munmap(addr, total_size); +} +static int prepare_tuple_buf(void **tuple_buf, size_t src_len, struct cache_page_map** page_cache) +{ + size_t tuple_buf_len = KAELZ4_compress_get_tuple_buf_len(g_file_chunk_size * 1024) * (src_len / (g_file_chunk_size * 1024) + 1) * 2; + size_t huge_page_num = tuple_buf_len * sizeof(Bytef) / HPAGE_SIZE + 1; // 大页大小为2M,申请大页时申请大小需为大页大小的整数倍 + size_t total_size = huge_page_num * HPAGE_SIZE; + *tuple_buf = get_huge_pages(total_size); + // printf("申请的tuple buf大页虚拟地址: %p len: 0x%lx\n", *tuple_buf, total_size); + + if (*tuple_buf == NULL) { + return -1; + } + + memset(*tuple_buf, 0, total_size); + + struct cache_page_map* cache = init_cache_page_map(*tuple_buf, total_size); + if (cache == NULL) { + printf("init_cache_page_map failed\n"); + return -1; + } + // uint64_t phys_addr = get_physical_address_cache_page_map(cache, *tuple_buf); + // printf("tuple buf大页物理地址: 0x%" PRIx64 "\n", phys_addr); + *page_cache = cache; + + return 0; +} + +static int retry_compression(struct my_custom_data *my_data); +static int g_total_trytimes = 0; + +static void compression_callback3(struct kaelz4_result *result) { + struct my_custom_data *my_data = (struct my_custom_data *)result->user_data; + + if (result->status != 0) { + printf("Compression callback failed with status: %d. retrying...\n", result->status); + + g_total_trytimes++; + if(g_total_trytimes % 1000 == 0) { + printf("依据尝试重试失败%d次\n", g_total_trytimes); + } + + g_has_done = -1; // 表示需要重试 + return; + } + // 在回调中获取压缩后的数据 + + if (g_test_frame == 1) { + LZ4F_preferences_t preferences = {0}; + preferences.frameInfo.blockSizeID = LZ4F_max64KB; // 设定块大小 + if (KAELZ4_rebuild_lz77_to_frame(&my_data->src_list, &my_data->tuple_list, &my_data->dst_list, result, &preferences) != 0) { + printf("[user]KAELZ4_rebuild_lz77_to_frame : %d\n", result->status); + } + } else { + if (KAELZ4_rebuild_lz77_to_block(&my_data->src_list, &my_data->tuple_list, &my_data->dst_list, result) != 0) { + printf("[user]KAELZ4_rebuild_lz77_to_block : %d\n", result->status); + } + } + + + size_t compressed_size = result->dst_len; + void *compressed_data = my_data->dst_list.buf[0].data; + + my_data->dst_len = compressed_size; + + // 使用LZ4解压缩数据 + size_t tmp_src_len = result->src_size * 10; + // 为解压数据分配内存 + void *dst_buffer = malloc(tmp_src_len); + if (!dst_buffer) { + printf("Memory allocation failed for decompressed data.\n"); + return; + } + + size_t ret = -1; + if (g_test_frame == 1) { + LZ4F_decompressionContext_t dctx; + LZ4F_createDecompressionContext(&dctx, 100); + ret = LZ4F_decompress(dctx, dst_buffer, &tmp_src_len, compressed_data, &compressed_size, NULL); + } else { + ret = LZ4_decompress_safe((char *)compressed_data, (char *)dst_buffer, compressed_size, tmp_src_len); + tmp_src_len = ret; // 解压后长度 + } + if (ret < 0) { + printf("Decompression failed with error code: %ld\n", ret); + free(dst_buffer); + return; + } + my_data->src_decompd = dst_buffer; + my_data->src_decompd_len = tmp_src_len; + + if (my_data->src_decompd_len != my_data->src_len) { + printf("Test Error: 解压后与原始长度不一样. result->src_size=%ld 原始长度=%ld 压缩后解压长度=%ld \n", + result->src_size, + my_data->src_len, + my_data->src_decompd_len); + } + + // 比较解压后的数据和原始数据 + if (memcmp(my_data->src_decompd, my_data->src_list.buf[0].data, result->src_size) == 0) { + if (g_test_frame == 1) { + printf("Test Success for less tuple buf with frame.\n"); + } else { + printf("Test Success for less tuple buf with block.\n"); + } + } else { + printf("Test Error:Decompressed data does not match the original data.\n"); + } + + // 释放解压后的数据 + free(dst_buffer); + g_has_done = 1; +} +static int retry_compression(struct my_custom_data *my_data) +{ + iova_map_fn usr_map = get_physical_address_wrapper; + void *sess = KAELZ4_create_async_compress_session(usr_map); + + struct kaelz4_result result = {0}; + result.user_data = my_data; + g_has_done = 0; + + size_t old_size = my_data->tuple_len; // 上一次分配的输出空间大小 + + void *tuple_buf = NULL; + struct cache_page_map *tuple_page_info = {0}; + + size_t src_len = old_size + 1 * 1024; + // 用于快速测试到达边界情况 + size_t limit_len = 60 * 1024; + if(src_len < limit_len) { + src_len = old_size * 2; + src_len = src_len > limit_len ? limit_len : src_len; + } + prepare_tuple_buf(&tuple_buf, src_len, &tuple_page_info); + struct kaelz4_buffer tuple_buf_array[128]; + my_data->tuple_list.buf_num = 1; + my_data->tuple_list.buf = tuple_buf_array; + my_data->tuple_list.buf[0].data = tuple_buf; + my_data->tuple_list.buf[0].buf_len = src_len; + my_data->tuple_list.usr_data = tuple_page_info; + + my_data->tuple_len = src_len; // 更新tuple_len为新的尝试长度 + + int compression_status = KAELZ4_compress_lz77_async_in_session(sess, &my_data->src_list, &my_data->tuple_list, + compression_callback3, &result); + + if (compression_status != 0) { + printf("Compression failed with error code: %d\n", compression_status); + return -1; + } + + while (g_has_done == 0) { + KAELZ4_compress_async_polling_in_session(sess, 1); + usleep(100); + } + KAELZ4_destroy_async_compress_session(sess); + release_huge_pages(tuple_buf, src_len); + + return compression_status; +} + +static int test_lz77_raw_polling(int contentChecksumFlag, int blockChecksumFlag, int contentSizeFlag) +{ + g_has_done = 0; + size_t src_len = 0; // 256KB + void *inbuf = NULL; + + src_len = read_inputFile("../../../scripts/compressTestDataset/calgary", &inbuf); + + + // 为压缩数据分配内存 + size_t compressed_size = LZ4F_compressBound(src_len, NULL); + void *compressed_data = malloc(compressed_size); + if (!compressed_data) { + printf("Memory allocation failed for compressed data.\n"); + free(inbuf); + return -1; + } + + // printf("compressed_size = %ld \n", compressed_size); + + iova_map_fn usr_map = get_physical_address_wrapper; + + void *sess = KAELZ4_create_async_compress_session(usr_map); + + // 异步压缩 + struct kaelz4_result result = {0}; + struct my_custom_data mydata = {0}; + + struct kaelz4_buffer src_buf[128]; + mydata.src_list.usr_data = g_page_info; + mydata.src_list.buf_num = 1; + mydata.src_list.buf = src_buf; + mydata.src_list.buf[0].data = inbuf; + mydata.src_list.buf[0].buf_len = src_len; + + void *tuple_buf = NULL; + struct cache_page_map *tuple_page_info = {0}; + + // 分配的输出空间小于784字节时,压缩错误值为3 + // 大于784字节时,空间又不足时,压缩错误值为8 + // 外部输入数据较大时,KAE内部分片,当分片的最后一片的空间小于784字节时,压缩错误值为3。 + size_t less_src_len = 100; + + prepare_tuple_buf(&tuple_buf, src_len, &tuple_page_info); + struct kaelz4_buffer tuple_buf_array[128]; + mydata.tuple_list.buf_num = 1; + mydata.tuple_list.buf = tuple_buf_array; + mydata.tuple_list.buf[0].data = tuple_buf; + mydata.tuple_list.buf[0].buf_len = less_src_len; + mydata.tuple_list.usr_data = tuple_page_info; + + struct kaelz4_buffer dst_buf[128]; + mydata.dst_list.buf_num = 1; + mydata.dst_list.buf = dst_buf; + mydata.dst_list.buf[0].data = compressed_data; + mydata.dst_list.buf[0].buf_len = compressed_size; + + mydata.src_len = src_len; + mydata.tuple_len = mydata.tuple_list.buf[0].buf_len; + + result.user_data = &mydata; + + int compression_status = KAELZ4_compress_lz77_async_in_session(sess, &mydata.src_list, &mydata.tuple_list, + compression_callback3, &result); + + if (compression_status != 0) { + printf("Compression failed with error code: %d\n", compression_status); + free(inbuf); + free(compressed_data); + return -1; + } + + while (g_has_done == 0) { + KAELZ4_compress_async_polling_in_session(sess, 1); + usleep(100); + } + KAELZ4_destroy_async_compress_session(sess); + + release_huge_pages(tuple_buf, src_len); + while(g_has_done == -1) { + // 重试压缩 + retry_compression(&mydata); + } + + return compression_status; +} +int test_async_err_dst_buf_less_lz77_raw() +{ + int ret = test_lz77_raw_polling(0, 0, 0); + g_test_frame = 1; + ret = test_lz77_raw_polling(0, 0, 0); + return ret; +} diff --git a/KAELz4/test/kzip/scene_test_functions/test_async_sgl.c b/KAELz4/test/kzip/scene_test_functions/test_async_sgl.c index 419b183..0eff8eb 100644 --- a/KAELz4/test/kzip/scene_test_functions/test_async_sgl.c +++ b/KAELz4/test/kzip/scene_test_functions/test_async_sgl.c @@ -228,15 +228,6 @@ static void compression_callback4(struct kaelz4_result *result) { } } - if (memcmp(my_data->src_decompd, src->buf[0].data, src->buf[0].buf_len) == 0) { - printf("buf[0] Test Success.\n"); - } - if (memcmp(my_data->src_decompd + src->buf[0].buf_len, src->buf[1].data, src->buf[1].buf_len) == 0) { - printf("buf[1] Test Success.\n"); - } else { - printf("buf[1] Test Error.\n"); - } - // 比较解压后的数据和原始数据 if (memcmp(my_data->src_decompd, source, result->src_size) == 0) { printf("Test Success.\n"); @@ -298,7 +289,7 @@ static int test_async_frame_and_sgl(int contentChecksumFlag, int blockChecksumFl struct kaelz4_buffer_list src = {0}; struct kaelz4_buffer src_buf[128]; - src.buf_num = 1; + src.buf_num = 1; // change buf_num to 1, 2, 3... to test different SGLs src.buf = src_buf; unsigned int tmp_size = src_len / src.buf_num; for (int i = 0; i < src.buf_num - 1; i++) { diff --git a/KAEZlib/README.md b/KAEZlib/README.md index 182cc4a..0551789 100644 --- a/KAEZlib/README.md +++ b/KAEZlib/README.md @@ -25,6 +25,13 @@ export LD_LIBRARY_PATH=/usr/local/kaezip/lib:$LD_LIBRARY_PATH | `KAEZIP_create_async_decompress_session`| 创建异步解压任务session | | `KAEZIP_decompress_async_in_session` | 提交异步解压任务 | | `KAEZIP_destroy_async_decompress_session` | 销毁解压任务session | +| `KAEZIP_reset_session` | 重置任务session | + +### API注意事项 +- 约束硬件规格为 Kunpeng 920 7280Z +- 当前异步接口仅支持输出 deflate_raw 格式数据 +- 约束每个session只能在同一个线程中使用,所有API接口不保证多线程安全,即不能在多个线程中,调用API接口传入相同的session,否则不保证压缩解压功能正常。不同session之间的资源互斥,建议不同线程创建并使用各自独立的session。 + ### API详细说明 ```c @@ -86,6 +93,11 @@ int KAEZIP_decompress_async_in_session(void *sess, const struct kaezip_buffer_li */ void KAEZIP_destroy_async_decompress_session(void *sess); +/** + * @brief: reset session and hardware ctx, all compress tasks will be canceled. + * @param: sess : session + */ +void KAEZIP_reset_session(void *sess); ``` ### API使用demo @@ -518,10 +530,6 @@ sh runPerf.sh -A kaezlibasync_deflate -m 1 -n 20000 -s [4/8/16/32/64] -r 1 -k 1 ~~~ -### API注意事项 -- 约束硬件规格为 Kunpeng 920 7280Z -- 当前异步接口仅支持输出 deflate_raw 格式数据 -- 约束每个session只能在同一个线程中使用,所有API接口不保证多线程安全,即不能在多个线程中,调用API接口传入相同的session,否则不保证压缩解压功能正常。不同session之间的资源互斥,建议不同线程创建并使用各自独立的session。 # Kunpeng Zlib Acceleration Engine -- Gitee From af99b711cd7fbc10c77a0ed11396cf33765f1e8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E5=87=A4?= Date: Tue, 22 Jul 2025 11:35:17 +0000 Subject: [PATCH 5/5] test: add more than one sess support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 白凤 --- .../test/kzip/alg/KAELz4Async/lz4AsyncFrame.c | 2 +- .../test/kzip/alg/KAEZlibAsync/deflateAsync.c | 29 +++++++++-- KAELz4/test/kzip/compress_ctx.h | 2 + KAELz4/test/kzip/main.c | 49 +++++++++++++++---- KAELz4/test/kzip/runPerf.sh | 10 ++-- 5 files changed, 75 insertions(+), 17 deletions(-) diff --git a/KAELz4/test/kzip/alg/KAELz4Async/lz4AsyncFrame.c b/KAELz4/test/kzip/alg/KAELz4Async/lz4AsyncFrame.c index 4851153..9ec445e 100644 --- a/KAELz4/test/kzip/alg/KAELz4Async/lz4AsyncFrame.c +++ b/KAELz4/test/kzip/alg/KAELz4Async/lz4AsyncFrame.c @@ -40,7 +40,7 @@ static int lz4_frame_bound(int src_len) { // if (g_has_custom_frameinfo_config == 1) { // return LZ4F_compressFrameBound(src_len, NULL) * 1.2; // } - return LZ4F_compressFrameBound(src_len, NULL); + return LZ4F_compressFrameBound(src_len, NULL) * 1.2; } // LZ4 frame 初始化 static int lz4_frame_init(struct compress_ctx *ctx) { diff --git a/KAELz4/test/kzip/alg/KAEZlibAsync/deflateAsync.c b/KAELz4/test/kzip/alg/KAEZlibAsync/deflateAsync.c index 593f508..253931d 100644 --- a/KAELz4/test/kzip/alg/KAEZlibAsync/deflateAsync.c +++ b/KAELz4/test/kzip/alg/KAEZlibAsync/deflateAsync.c @@ -45,17 +45,38 @@ static int zlib_bound(int src_len) { } // Zlib 初始化 static int zlib_async_deflate_init(struct compress_ctx *ctx) { - if(ctx->compress_or_decompress == 1) { - ctx->sess = KAEZIP_create_async_compress_session(ctx->usr_map); + if(ctx->sess_count > 1) { + for (int i = 0; i < ctx->sess_count; ++i) { + if(ctx->compress_or_decompress == 1) { + ctx->sess_array[i] = KAEZIP_create_async_compress_session(ctx->usr_map); + } else { + ctx->sess_array[i] = KAEZIP_create_async_decompress_session(ctx->usr_map); + } + if (!ctx->sess_array[i]) { + fprintf(stderr, "Failed to create session %d\n", i); + } + } } else { - ctx->sess = KAEZIP_create_async_decompress_session(ctx->usr_map); + if(ctx->compress_or_decompress == 1) { + ctx->sess = KAEZIP_create_async_compress_session(ctx->usr_map); + } else { + ctx->sess = KAEZIP_create_async_decompress_session(ctx->usr_map); + } } return 0; } static void zlib_async_deflate_cleanup(struct compress_ctx *ctx) { - KAEZIP_destroy_async_compress_session(ctx->sess); + if(ctx->sess_count > 1) { + for (int i = 0; i < ctx->sess_count; ++i) { + if (ctx->sess_array[i]) { + KAEZIP_destroy_async_compress_session(ctx->sess_array[i]); + } + } + } else { + KAEZIP_destroy_async_compress_session(ctx->sess); + } } // Zlib 算法实例 diff --git a/KAELz4/test/kzip/compress_ctx.h b/KAELz4/test/kzip/compress_ctx.h index adc9a0d..36a3e8f 100644 --- a/KAELz4/test/kzip/compress_ctx.h +++ b/KAELz4/test/kzip/compress_ctx.h @@ -71,6 +71,8 @@ struct compress_ctx { int with_crc; unsigned int src_buf_num; void *sess; + void **sess_array; // sess指针数组 + int sess_count; // sess指针数量,默认1 iova_map_fn usr_map; uint64_t *all_delays; int is_polling; diff --git a/KAELz4/test/kzip/main.c b/KAELz4/test/kzip/main.c index e226f3c..c0defd8 100644 --- a/KAELz4/test/kzip/main.c +++ b/KAELz4/test/kzip/main.c @@ -386,9 +386,14 @@ static void compress_async_polling(struct compress_param *param) struct compress_ctx *ctx = param->ctx; while (unlikely(param->done != 1)) { - if (ctx->sess && ctx->algorithm->poll) - ctx->algorithm->poll(ctx->sess, 1); - else { + if ((ctx->sess || ctx->sess_count > 1) && ctx->algorithm->poll) { + if(ctx->sess_count > 1) { + int idx = param->sn % ctx->sess_count; + ctx->algorithm->poll(ctx->sess_array[idx], 1); + } else { + ctx->algorithm->poll(ctx->sess, 1); + } + } else { ctx->param_index = (ctx->param_index + 1) % ctx->inflight_num; param = &ctx->param_buf[ctx->param_index]; } @@ -473,17 +478,30 @@ static void compress_async_callback(struct kaelz4_result *result) return; } +static int get_session_index(struct compress_ctx *ctx) { + int idx = ctx->sn % ctx->sess_count; + return idx; +} + static int do_real_compression(struct compress_ctx *ctx, const struct kaelz4_buffer_list *src, unsigned int *src_len, struct kaelz4_buffer_list *dst, unsigned int *dst_len, void *param) { if (ctx->compress_or_decompress) { // 压缩流程。 if (ctx->algorithm->async_compress) { + if(ctx->sess_count > 1) { + int idx = get_session_index(ctx); + return ctx->algorithm->async_compress(ctx->sess_array[idx], src, dst, compress_async_callback, param); + } return ctx->algorithm->async_compress(ctx->sess, src, dst, compress_async_callback, param); } else { return ctx->algorithm->compress(src->buf[0].data, src_len, dst->buf[0].data, dst_len); } } else { // 解压逻辑 if (ctx->algorithm->async_decompress) { + if(ctx->sess_count > 1) { + int idx = get_session_index(ctx); + return ctx->algorithm->async_decompress(ctx->sess_array[idx], src, dst, compress_async_callback, param); + } return ctx->algorithm->async_decompress(ctx->sess, src, dst, compress_async_callback, param); } else { return ctx->algorithm->decompress(src->buf[0].data, src_len, dst->buf[0].data, dst_len); @@ -493,7 +511,7 @@ static int do_real_compression(struct compress_ctx *ctx, const struct kaelz4_buf } static void compress_ctx_init(struct compress_ctx *ctx, int compress_or_decompress, unsigned int inflight_num, - unsigned int chunk_len, compression_algorithm_t *algorithm, int is_test_crc) + unsigned int chunk_len, compression_algorithm_t *algorithm, int is_test_crc, int sess_nums) { ctx->algorithm = algorithm; ctx->chunk_len = chunk_len; @@ -511,7 +529,12 @@ static void compress_ctx_init(struct compress_ctx *ctx, int compress_or_decompre ctx->usr_map = NULL; if (g_file_chunk_size && ((size_t)g_file_chunk_size * 1024) <= HPAGE_SIZE && ((ctx->algorithm->async_compress != NULL && ctx->compress_or_decompress != 0) || (ctx->algorithm->async_decompress != NULL && !ctx->compress_or_decompress))) { + // 此处src_buf_num可修改为其他值,用于测试多个链表节点的功能和性能。 ctx->src_buf_num = 4; + // 分片为4k的模式下,使用单个buf节点性能最优,比4个节点的情况性能提升约4%。 + if(g_file_chunk_size == 4) { + ctx->src_buf_num = 1; + } } ctx->all_delays = (uint64_t *)malloc(sizeof(uint64_t) * MAX_LATENCY_COUNT); @@ -535,6 +558,9 @@ static void compress_ctx_init(struct compress_ctx *ctx, int compress_or_decompre } ctx->is_polling = g_enable_polling_mode; ctx->is_zlib = strcmp(algorithm->name, "kaezlibasync_deflate") == 0; + + ctx->sess_count = sess_nums; + ctx->sess_array = calloc(ctx->sess_count, sizeof(void *)); } static void compress_ctx_destory(struct compress_ctx *ctx) @@ -1314,7 +1340,7 @@ int round_trip_fuzztest(uint32_t RDGseed) return -1; } struct compress_ctx ctx; - compress_ctx_init(&ctx, compress, inflight_num, chunk_len, algorithm, 0); + compress_ctx_init(&ctx, compress, inflight_num, chunk_len, algorithm, 0, 1); ctx.loop_times = loop_times; if (!ctx.compress_or_decompress) { ctx.loop_times = 1; @@ -1333,7 +1359,7 @@ int round_trip_fuzztest(uint32_t RDGseed) int j; for (j = 0; j < threadNum; j++) { struct thread_compress_args *args = malloc(sizeof(struct thread_compress_args)); - compress_ctx_init(&args->ctx, compress, inflight_num, chunk_len, algorithm, 0); + compress_ctx_init(&args->ctx, compress, inflight_num, chunk_len, algorithm, 0, 1); args->ctx.thread_id = j; args->ctx.loop_times = loop_times; args->in_filename = in_filename; @@ -1388,6 +1414,7 @@ static void usage(void) printf(" -P: use Huge Pages to save uncompress data \n"); printf(" -p: use polling mode to wait for async operation done\n"); printf(" -r: take crc32 checksum when data is callback.default: 0 \n"); + printf(" -e: use how many sessions to test compression at same time.default: 1 \n"); printf(" example: ./kzip -A kaelz4 -m 2 -f ./kzip -o ./kzip.compressd -n 1000\n"); printf(" ./kzip -A kaelz4 -d -m 2 -f ./kzip.compressd -o ./kzip.origin -n 1000\n"); } @@ -1397,7 +1424,7 @@ int main(int argc, char **argv) initialize_algorithms(); init_env_config(); - const char *optstring = "dm:l:n:w:f:o:v:A:hg:s:c:i:t:T:F:r:P:p:"; + const char *optstring = "dm:l:n:w:f:o:v:A:hg:s:c:i:t:T:F:r:P:p:e:"; int ret = 0; int o = 0; int multi = 1; @@ -1417,6 +1444,7 @@ int main(int argc, char **argv) int fuzztest = 0; uint32_t RDGseed = 0; // 随机数据生成种子 int is_test_crc = 0; // 是否每次都带上crc32校验值 + int sess_nums = 1; // 本次压缩任务创建的sess数量。默认1个session。大于1时需要使用数组存储创建的sessions,所有任务按策略使用。 while ((o = getopt(argc, argv, optstring)) != -1) { if(optstring == NULL) continue; @@ -1430,6 +1458,9 @@ int main(int argc, char **argv) case 'd': compress = 0; break; + case 'e': + sess_nums = atoi(optarg); + break; case 'F': fuzztest = 1; RDGseed = (uint32_t)atoi(optarg); @@ -1520,7 +1551,7 @@ int main(int argc, char **argv) } struct compress_ctx *ctx = malloc(sizeof(struct compress_ctx)); - compress_ctx_init(ctx, compress, inflight_num, chunk_len, algorithm, is_test_crc); + compress_ctx_init(ctx, compress, inflight_num, chunk_len, algorithm, is_test_crc, sess_nums); ctx->loop_times = loop_times; if (!ctx->compress_or_decompress && threadNum == 1) { // 如果是分片解压,单独处理 @@ -1531,7 +1562,7 @@ int main(int argc, char **argv) int j; for (j = 0; j < threadNum; j++) { struct thread_compress_args *args = malloc(sizeof(struct thread_compress_args)); - compress_ctx_init(&args->ctx, compress, inflight_num, chunk_len, algorithm, is_test_crc); + compress_ctx_init(&args->ctx, compress, inflight_num, chunk_len, algorithm, is_test_crc, sess_nums); args->ctx.thread_id = j; args->ctx.loop_times = loop_times; args->in_filename = in_filename; diff --git a/KAELz4/test/kzip/runPerf.sh b/KAELz4/test/kzip/runPerf.sh index bf4b1b7..f985b03 100644 --- a/KAELz4/test/kzip/runPerf.sh +++ b/KAELz4/test/kzip/runPerf.sh @@ -5,7 +5,7 @@ export KAE_LZ4_COMP_TYPE=8 export KAE_LZ4_ASYNC_DC_THREAD_NUM=10 # 使用 getopts 解析命令行参数 -while getopts "m:l:n:w:f:o:v:A:h:g:s:c:i:t:p:k:r:P:" opt; do +while getopts "m:l:n:w:f:o:v:A:h:g:s:c:i:t:p:k:r:P:e:" opt; do case $opt in A) # 要测试的算法 Alg="$OPTARG" @@ -37,6 +37,9 @@ while getopts "m:l:n:w:f:o:v:A:h:g:s:c:i:t:p:k:r:P:" opt; do p) isTestPolling="$OPTARG" ;; + e) + sess_nums="$OPTARG" + ;; *) echo "Usage: all params m:l:n:w:f:o:v:A:h:s:c:" exit 1 @@ -54,6 +57,7 @@ testFile=${testFile:="../../../scripts/compressTestDataset/calgary"} useKAENum=${useKAENum:=2} isTestCrc=${isTestCrc:=0} isTestPolling=${isTestPolling:=0} +sess_nums=${sess_nums:=1} buildParams="kaelz4" sh build.sh $buildParams @@ -94,12 +98,12 @@ echo "taskset -c $bindCpu0AndCpu1 ./kzip -d -A $Alg -m $multiProcess -f $testFil date # gdb --args -taskset -c $bindCpu0AndCpu1 ./kzip -A $Alg -m $multiProcess -f $testFile -o $testFileComped -c $cpuConfigStr -n $loppTimes -s $fileChunk -i $inflightNum -t $threadsNum -r $isTestCrc -p $isTestPolling +taskset -c $bindCpu0AndCpu1 ./kzip -A $Alg -m $multiProcess -f $testFile -o $testFileComped -c $cpuConfigStr -n $loppTimes -s $fileChunk -i $inflightNum -t $threadsNum -r $isTestCrc -p $isTestPolling -e $sess_nums date # sleep 1 #taskset -c $bindCpu0AndCpu1 gdb --args ./kzip -d -A "kaezlib_deflate" -m $multiProcess -f $testFileComped -o $testFileOrigin -c $cpuConfigStr -n $loppTimes -s $fileChunk -i $inflightNum -t $threadsNum -r $isTestCrc -taskset -c $bindCpu0AndCpu1 ./kzip -d -A $Alg -m $multiProcess -f $testFileComped -o $testFileOrigin -c $cpuConfigStr -n $loppTimes -s $fileChunk -i $inflightNum -t $threadsNum -r $isTestCrc +taskset -c $bindCpu0AndCpu1 ./kzip -d -A $Alg -m $multiProcess -f $testFileComped -o $testFileOrigin -c $cpuConfigStr -n $loppTimes -s $fileChunk -i $inflightNum -t $threadsNum -r $isTestCrc -p $isTestPolling -e $sess_nums date if [[ ! -f "$testFile" ]]; then echo "Error: 压缩异常!未成功压缩文件" -- Gitee