From 92fea48cd534fb6009fddc6c9319b73de08443fa Mon Sep 17 00:00:00 2001 From: liuzifeng Date: Thu, 7 Nov 2024 10:36:58 +0800 Subject: [PATCH] fix set var --- storage/ctc/ctc_ddl_rewriter_plugin.cc | 127 +++++++++++++++++-------- storage/ctc/ha_ctc.cc | 46 +++++---- storage/ctc/ha_ctc.h | 7 ++ storage/ctc/ha_ctc_ddl.h | 6 ++ 4 files changed, 126 insertions(+), 60 deletions(-) diff --git a/storage/ctc/ctc_ddl_rewriter_plugin.cc b/storage/ctc/ctc_ddl_rewriter_plugin.cc index 64c5b3f..b1bb6a2 100644 --- a/storage/ctc/ctc_ddl_rewriter_plugin.cc +++ b/storage/ctc/ctc_ddl_rewriter_plugin.cc @@ -436,20 +436,6 @@ static int ctc_check_flush(string &, MYSQL_THD thd, bool &need_forward) { return 0; } -static unordered_set set_variable_not_broadcast{"ctc_ddl_local_enabled", "ctc_ddl_enabled"}; -static bool ctc_check_ddl_local_enable(string sql_str, bool &need_forwar) { - transform(sql_str.begin(), sql_str.end(), sql_str.begin(), ::tolower); - - for (auto it : set_variable_not_broadcast) { - if (sql_str.find(it) != sql_str.npos) { - need_forwar = false; - return true; - } - } - - return false; -} - static uint32_t ctc_set_var_option(bool is_null_value, bool is_set_default_value, set_var *setvar) { uint32_t options = 0; @@ -565,29 +551,74 @@ static int ctc_check_set_opt_rule(set_var *setvar, string& name_str, string& use return ret; } -/* 参考set_var.cc: sql_set_variables */ -static int ctc_check_set_opt(string &sql_str, MYSQL_THD thd, bool &need_forward) { - if (ctc_check_ddl_local_enable(sql_str, need_forward)) { +static int ctc_set_user_var_flag(MYSQL_THD thd, string name, string value) { + handlerton* hton = get_ctc_hton(); + thd_sess_ctx_s *sess_ctx = get_or_init_sess_ctx(hton, thd); + if (sess_ctx == nullptr) { + return HA_ERR_OUT_OF_MEM; + } + bool is_flag_set = (value == "1") || (value == "true"); + bool is_flag_unset = (value == "0") || (value == "false") || (value == "NULL"); + auto it = user_var_flag_map.find(name); + if (it != user_var_flag_map.end()) { + if (is_flag_set) { + sess_ctx->set_flag |= it->second; + } else if (is_flag_unset) { + sess_ctx->set_flag &= ~it->second; + } else { + my_printf_error(ER_UNKNOWN_COM_ERROR, "Invalid variable value for '%s': '%s'", MYF(0), + name.c_str(), value.c_str()); + return -1; + } + } + return 0; +} + +static int check_non_system_var(set_var_base *var, bool& need_forward, MYSQL_THD thd) { + need_forward = false; + if (typeid(*var) != typeid(set_var_user)) { return 0; } - List_iterator_fast var_it(thd->lex->var_list); + set_var_user *setvar_user = dynamic_cast(var); + String set_str; + string var_name; + string var_value; + // 参考set_var.cc: set_var_user::print + // set_str由print函数追加"@"和":="生成 + setvar_user->print(thd, &set_str); + if (set_str.ptr() == nullptr || set_str.length() == 0) { + my_printf_error(ER_NOT_ALLOWED_COMMAND, "%s", MYF(0), "The used command is not allowed"); + return -1; + } + string str(set_str.ptr(), set_str.length()); + size_t pos = str.find("@"); + if (pos != str.npos) { + size_t end_pos = str.find(":=", pos); + if (end_pos != str.npos) { + size_t name_start = pos + 1; + size_t name_len = end_pos - name_start; + var_name = str.substr(name_start, name_len); + } + } + size_t value_pos = str.find(":="); + if (value_pos != str.npos) { + size_t value_start = value_pos + 2; + size_t value_len = str.length() - value_start; + var_value = str.substr(value_start, value_len); + } - set_var_base *var = nullptr; + return ctc_set_user_var_flag(thd, var_name, var_value); +} + +static int check_system_var(set_var_base *var, string &sql_str, MYSQL_THD thd, + bool& need_forward, bool& contain_subselect) { + set_var *setvar = dynamic_cast(var); + bool is_set_default_value = false; + bool is_null_value = false; int ret = 0; string name_str; string val_str; - - // broadcast SET_OPTION query with subselect item - bool contain_subselect = false; - if (thd->lex->query_tables) { - contain_subselect = true; - } - var_it.rewind(); - while ((var = var_it++)) { - set_var *setvar = dynamic_cast(var); - bool is_set_default_value = false; - bool is_null_value = false; #ifdef FEATURE_X_FOR_MYSQL_32 if (setvar) { std::function f = [&thd, &need_forward, setvar] @@ -616,19 +647,16 @@ static int ctc_check_set_opt(string &sql_str, MYSQL_THD thd, bool &need_forward) } ret |= ctc_check_set_opt_rule(setvar, name_str, val_str, need_forward); } - } else { - // There's no need to broadcast non-set_var SET_OPTION cmds. - need_forward = false; } - if (need_forward && allow_sqlcmd(thd, "ctc_setopt_disabled") != 0) { - my_printf_error(ER_DISALLOWED_OPERATION, "%s", MYF(0), "Set global variable query is not allowed (ctc_setopt_disabled = true)"); + my_printf_error(ER_DISALLOWED_OPERATION, "%s", MYF(0), + "Set global variable query is not allowed (ctc_setopt_disabled = true)"); return -1; } - if(IS_METADATA_NORMALIZATION() && !contain_subselect && need_forward && setvar) { + if (IS_METADATA_NORMALIZATION() && !contain_subselect && need_forward && setvar) { if (setvar->check(thd) == 0) { - bool var_real_type = false; + bool var_real_type = false; if (setvar->value && setvar->value->result_type() == INT_RESULT) { var_real_type = true; } @@ -636,15 +664,36 @@ static int ctc_check_set_opt(string &sql_str, MYSQL_THD thd, bool &need_forward) #ifdef FEATURE_X_FOR_MYSQL_26 ret = ctc_set_var_meta(thd, options, setvar->base.str, name_str, val_str, var_real_type); #elif defined(FEATURE_X_FOR_MYSQL_32) - ret = ctc_set_var_meta(thd, options, setvar->m_var_tracker.get_var_name() + ret = ctc_set_var_meta(thd, options, setvar->m_var_tracker.get_var_name(), name_str, val_str, var_real_type); #endif } else { thd->clear_error(); - need_forward = false; // 值校验失败, ctc不进行广播并返回成功, 后续报错由MySQL完成 + need_forward = false; // 值校验失败, ctc不进行广播并返回成功, 后续报错由MySQL完成 } } + return ret; +} +/* 参考set_var.cc: sql_set_variables */ +static int ctc_check_set_opt(string &sql_str, MYSQL_THD thd, bool &need_forward) { + List_iterator_fast var_it(thd->lex->var_list); + + set_var_base *var = nullptr; + int ret = 0; + + // broadcast SET_OPTION query with subselect item + bool contain_subselect = false; + if (thd->lex->query_tables) { + contain_subselect = true; + } + var_it.rewind(); + while ((var = var_it++)) { + if (typeid(*var) != typeid(set_var)) { + ret = check_non_system_var(var, need_forward, thd); + } else { + ret = check_system_var(var, sql_str, thd, need_forward, contain_subselect); + } ctc_log_debug("set option %s, need_forward: %d", sql_str.c_str(), need_forward); } if (IS_METADATA_NORMALIZATION() && !contain_subselect) { diff --git a/storage/ctc/ha_ctc.cc b/storage/ctc/ha_ctc.cc index 6032249..c4dfb3d 100644 --- a/storage/ctc/ha_ctc.cc +++ b/storage/ctc/ha_ctc.cc @@ -340,16 +340,6 @@ static inline bool is_create_table_check(MYSQL_THD thd) { return (thd->lex->sql_command == SQLCOM_CREATE_TABLE && thd->lex->is_exec_started()); } -bool user_var_set(MYSQL_THD thd, string target_str) { - user_var_entry *var_entry; - var_entry = find_or_nullptr(thd->user_vars, target_str); - if (var_entry != nullptr && var_entry->ptr() != nullptr) { - ctc_log_debug("thd (%d) has user variable %s", thd->thread_id(), target_str.data()); - return true; - } - return false; -} - dml_flag_t ctc_get_dml_flag(THD *thd, bool is_replace, bool auto_inc_used, bool has_explicit_autoinc, bool dup_update) { dml_flag_t flag; @@ -412,13 +402,22 @@ bool is_alter_table_scan(bool m_error_if_not_empty) { } bool ddl_enabled_normal(MYSQL_THD thd) { - return !user_var_set(thd, "ctc_ddl_local_enabled") && - (ctc_concurrent_ddl == true || user_var_set(thd, "ctc_ddl_enabled")); + handlerton* hton = get_ctc_hton(); + thd_sess_ctx_s *sess_ctx = get_or_init_sess_ctx(hton, thd); + assert(sess_ctx != nullptr); + // 1.CTC_DDL_LOCAL_ENABLED被设置:不能从rewrite插件下发任何SQL语句 + // 2.CTC_DDL_LOCAL_ENABLED没被设置,ctc_concurrent_ddl=true,允许运行到判断是否广播的逻辑中 + // 3.CTC_DDL_LOCAL_ENABLED没被设置,ctc_concurrent_ddl=false,若CTC_DDL_ENABLED被设置,允许运行到判断是否广播的逻辑中 + return !(sess_ctx->set_flag & CTC_DDL_LOCAL_ENABLED) && + (ctc_concurrent_ddl == true || (sess_ctx->set_flag & CTC_DDL_ENABLED)); } bool engine_skip_ddl(MYSQL_THD thd) { + handlerton* hton = get_ctc_hton(); + thd_sess_ctx_s *sess_ctx = get_or_init_sess_ctx(hton, thd); + assert(sess_ctx != nullptr); // 接口流程不需要走到参天: 用于参天SYS库操作 - return user_var_set(thd, "ctc_ddl_local_enabled") && ctc_concurrent_ddl == true; + return (sess_ctx->set_flag & CTC_DDL_LOCAL_ENABLED) && ctc_concurrent_ddl == true; } bool engine_ddl_passthru(MYSQL_THD thd) { @@ -426,7 +425,10 @@ bool engine_ddl_passthru(MYSQL_THD thd) { if (is_initialize() || is_meta_version_upgrading_force()) { return false; } - bool is_mysql_local = user_var_set(thd, "ctc_ddl_local_enabled"); + handlerton* hton = get_ctc_hton(); + thd_sess_ctx_s *sess_ctx = get_or_init_sess_ctx(hton, thd); + assert(sess_ctx != nullptr); + bool is_mysql_local = (sess_ctx->set_flag & CTC_DDL_LOCAL_ENABLED); return is_initialize() || !mysqld_server_started || is_mysql_local; } @@ -437,12 +439,13 @@ bool ha_ctc::is_replay_ddl(MYSQL_THD thd) { if (mysql_system_db.find(db_name) != mysql_system_db.end()) { return false; } + + handlerton* hton = get_ctc_hton(); + thd_sess_ctx_s *sess_ctx = get_or_init_sess_ctx(hton, thd); + assert(sess_ctx != nullptr); - if (user_var_set(thd, "ctc_ddl_local_enabled") && user_var_set(thd, "ctc_replay_ddl")) { - return true; - } - - return false; + uint ctc_var_flag = (CTC_DDL_LOCAL_ENABLED | CTC_REPLAY_DDL); + return (sess_ctx->set_flag & ctc_var_flag) == ctc_var_flag; } static int ctc_reg_instance() { @@ -1297,6 +1300,7 @@ thd_sess_ctx_s *get_or_init_sess_ctx(handlerton *hton, THD *thd) { sess_ctx->invalid_cursors = nullptr; assert(sess_ctx->cursors_map->size() == 0); sess_ctx->msg_buf = nullptr; + sess_ctx->set_flag = 0; thd_set_ha_data(thd, hton, sess_ctx); } return sess_ctx; @@ -1528,7 +1532,7 @@ static int ctc_start_trx_and_assign_scn( int isolation_level = isolation_level_to_cantian(mysql_isolation); uint32_t lock_wait_timeout = THDVAR(thd, lock_wait_timeout); ctc_trx_context_t trx_context = {isolation_level, autocommit, lock_wait_timeout, false}; - bool is_mysql_local = user_var_set(thd, "ctc_ddl_local_enabled"); + bool is_mysql_local = (sess_ctx->set_flag & CTC_DDL_LOCAL_ENABLED); ct_errno_t ret = (ct_errno_t)ctc_trx_begin(&tch, trx_context, is_mysql_local); if (ret != CT_SUCCESS) { ctc_log_error("start trx failed with error code: %d", ret); @@ -4294,7 +4298,7 @@ int ha_ctc::start_stmt(THD *thd, thr_lock_type) { ctc_trx_context_t trx_context = {isolation_level, autocommit, lock_wait_timeout, m_select_lock == lock_mode::EXCLUSIVE_LOCK}; - bool is_mysql_local = user_var_set(thd, "ctc_ddl_local_enabled"); + bool is_mysql_local = (sess_ctx->set_flag & CTC_DDL_LOCAL_ENABLED); ct_errno_t ret = (ct_errno_t)ctc_trx_begin(&m_tch, trx_context, is_mysql_local); check_error_code_to_mysql(ha_thd(), &ret); diff --git a/storage/ctc/ha_ctc.h b/storage/ctc/ha_ctc.h index 43244b3..35f889d 100644 --- a/storage/ctc/ha_ctc.h +++ b/storage/ctc/ha_ctc.h @@ -191,6 +191,12 @@ enum class mysql_run_mode { DOUBLE }; +enum set_opt_flag { + CTC_DDL_LOCAL_ENABLED = 1 << 0, + CTC_DDL_ENABLED = 1 << 1, + CTC_REPLAY_DDL = 1 << 2 +}; + typedef int (*ctc_prefetch_fn)(ctc_handler_t *tch, uint8_t *records, uint16_t *record_lens, uint32_t *recNum, uint64_t *rowids, int32 max_row_size); @@ -977,6 +983,7 @@ typedef struct { std::unordered_map *cursors_map; std::vector *invalid_cursors; void* msg_buf; + uint set_flag; } thd_sess_ctx_s; #pragma pack() diff --git a/storage/ctc/ha_ctc_ddl.h b/storage/ctc/ha_ctc_ddl.h index 9bce090..910279f 100644 --- a/storage/ctc/ha_ctc_ddl.h +++ b/storage/ctc/ha_ctc_ddl.h @@ -177,6 +177,12 @@ static map g_ctc_alter_tab {ALTER_TABLESPACE_OPTIONS, CTC_ALTSPACE_SET_AUTOEXTEND}, // option 只有auto extend适配 }; +static const std::unordered_map user_var_flag_map = { + {"ctc_ddl_local_enabled", CTC_DDL_LOCAL_ENABLED}, + {"ctc_ddl_enabled", CTC_DDL_ENABLED}, + {"ctc_replay_ddl", CTC_REPLAY_DDL} +}; + class ctc_ddl_stack_mem { public: ctc_ddl_stack_mem(size_t mem_size):buf_obj(nullptr) { -- Gitee