From 2f66554ffb8ae5afa255efea456493f97555d292 Mon Sep 17 00:00:00 2001 From: xicoding Date: Thu, 22 Feb 2024 20:32:19 +0800 Subject: [PATCH] sync rongzai code and problem --- storage/tianchi/datatype_cnvrtr.cc | 31 +++++++++++++++++++--- storage/tianchi/ha_tse.cc | 28 ++++++++++++++----- storage/tianchi/ha_tse.h | 6 ++--- storage/tianchi/tse_ddl_rewriter_plugin.cc | 8 ++++-- storage/tianchi/tse_srv.h | 9 ++++--- 5 files changed, 63 insertions(+), 19 deletions(-) diff --git a/storage/tianchi/datatype_cnvrtr.cc b/storage/tianchi/datatype_cnvrtr.cc index f9fc4a5..f1ec7a5 100644 --- a/storage/tianchi/datatype_cnvrtr.cc +++ b/storage/tianchi/datatype_cnvrtr.cc @@ -1166,11 +1166,12 @@ static void tse_my_timestamp_to_binary(const struct timeval *tm, uchar *ptr, uin mi_int3store(ptr + 4, tm->tv_usec); } } + /** @brief convert datetime data from cantian to mysql */ -static void convert_datetime_to_mysql(const field_cnvrt_aux_t* mysql_info, +static void convert_datetime_to_mysql(tianchi_handler_t &tch, const field_cnvrt_aux_t* mysql_info, uint8_t *mysql_ptr, uchar *cantian_ptr, Field *mysql_field) { if (mysql_info->mysql_field_type == MYSQL_TYPE_DATE) { @@ -1181,7 +1182,7 @@ static void convert_datetime_to_mysql(const field_cnvrt_aux_t* mysql_info, cnvrt_time_decimal(cantian_ptr, DATETIME_MAX_DECIMALS, mysql_ptr, mysql_field->decimals(), mysql_field->pack_length()); return; } - if (mysql_info->mysql_field_type == MYSQL_TYPE_DATETIME) { + if (mysql_info->mysql_field_type == MYSQL_TYPE_DATETIME && !tch.read_only_in_ct) { cnvrt_datetime_decimal(cantian_ptr, DATETIME_MAX_DECIMALS, mysql_ptr, mysql_field->decimals(), mysql_field->pack_length()); return; } @@ -1192,6 +1193,28 @@ static void convert_datetime_to_mysql(const field_cnvrt_aux_t* mysql_info, cm_decode_date(date, &date_detail); switch (mysql_info->mysql_field_type) { + case MYSQL_TYPE_DATETIME: { + // Assigning Values for MYSQL_TIME + MYSQL_TIME ltime; + ltime.day = date_detail.day; + ltime.month = date_detail.mon; + ltime.year = date_detail.year; + ltime.second = date_detail.sec; + ltime.minute = date_detail.min; + ltime.hour = date_detail.hour; + ltime.second_part = 0; + ltime.neg = false; + ltime.time_zone_displacement = 0; + ltime.time_type = MYSQL_TIMESTAMP_DATETIME; + + uchar data[8]; + longlong ll = TIME_to_longlong_datetime_packed(ltime); + int dec = mysql_field->decimals(); + my_datetime_packed_to_binary(ll, data, dec); + + memcpy(mysql_ptr, data, mysql_field->pack_length()); + break; + } case MYSQL_TYPE_YEAR: *(uint8_t *)mysql_ptr = date_detail.year == 0 ? (uint8_t)0 : (uint8_t)(date_detail.year - 1900); break; @@ -1531,11 +1554,11 @@ void copy_column_data_to_mysql(field_info_t *field_info, const field_cnvrt_aux_t break; case DATETIME_DATA: convert_datetime_to_mysql( - mysql_info, field_info->mysql_cur_field, field_info->cantian_cur_field, field_info->field); + tch, mysql_info, field_info->mysql_cur_field, field_info->cantian_cur_field, field_info->field); break; case STRING_DATA: { lob_locator_t *locator = NULL; - if (!VARCHAR_AS_BLOB(field_info->field->row_pack_length())) { + if (!VARCHAR_AS_BLOB(field_info->field->row_pack_length()) || tch.read_only_in_ct) { src = field_info->cantian_cur_field; src_len = field_info->field_len; } else { diff --git a/storage/tianchi/ha_tse.cc b/storage/tianchi/ha_tse.cc index be2400b..43450a9 100644 --- a/storage/tianchi/ha_tse.cc +++ b/storage/tianchi/ha_tse.cc @@ -370,8 +370,12 @@ bool is_alter_table_scan(bool m_error_if_not_empty) { bool ddl_enabled_normal(MYSQL_THD thd) { return !user_var_set(thd, "ctc_ddl_local_enabled") && - (tse_concurrent_ddl == true || - (tse_concurrent_ddl == false && user_var_set(thd, "ctc_ddl_enabled"))); + (tse_concurrent_ddl == true || user_var_set(thd, "ctc_ddl_enabled")); +} + +bool engine_skip_ddl(MYSQL_THD thd) { + // 接口流程不需要走到参天: 用于参天SYS库操作 + return user_var_set(thd, "ctc_ddl_local_enabled") && tse_concurrent_ddl == true; } bool engine_ddl_passthru(MYSQL_THD thd) { @@ -379,8 +383,8 @@ bool engine_ddl_passthru(MYSQL_THD thd) { if (is_meta_version_initialize()) { return false; } - - return is_initialize() || !mysqld_server_started || user_var_set(thd, "ctc_ddl_local_enabled"); + bool is_mysql_local = user_var_set(thd, "ctc_ddl_local_enabled"); + return is_initialize() || !mysqld_server_started || is_mysql_local; } bool ha_tse::is_replay_ddl(MYSQL_THD thd) { @@ -1348,6 +1352,8 @@ void update_member_tch(tianchi_handler_t &tch, handlerton *hton, THD *thd, bool tch.cursor_ref = 0; tch.pre_sess_addr = 0; tch.msg_buf = nullptr; + tch.change_data_capture = 0; + tch.read_only_in_ct = false; return; } @@ -1617,7 +1623,7 @@ static int tse_commit(handlerton *hton, THD *thd, bool commit_trx) { tse_log_error("commit atomic ddl failed with error code: %d", ret); return convert_tse_error_code_to_mysql(ret); } - if (is_ddl_commit) { + if (is_ddl_commit && !engine_skip_ddl(thd)) { tse_ddl_broadcast_request broadcast_req {{0}, {0}, {0}, {0}, 0, 0, 0, 0, {0}}; string sql = string(thd->query().str).substr(0, thd->query().length); FILL_BROADCAST_BASE_REQ(broadcast_req, sql.c_str(), thd->m_main_security_ctx.priv_user().str, @@ -1757,6 +1763,9 @@ static void tse_kill_connection(handlerton *hton, THD *thd) { } static int tse_pre_create_db4cantian(THD *thd, tianchi_handler_t *tch) { + if (engine_skip_ddl(thd)) { + return CT_SUCCESS; + } char user_name[SMALL_RECORD_SIZE]; tse_copy_name(user_name, thd->lex->name.str, SMALL_RECORD_SIZE); int error_code = 0; @@ -1931,6 +1940,10 @@ static bool tse_notify_exclusive_mdl(THD *thd, const MDL_key *mdl_key, } if (!IS_METADATA_NORMALIZATION()) { + if (engine_skip_ddl(thd)) { + tse_log_warning("[CTC_NOMETA_SQL]:record sql str only generate metadata. sql:%s", thd->query().str); + return false; + } if (!ddl_enabled_normal(thd)) { my_printf_error(ER_DISALLOWED_OPERATION, "%s", MYF(0), "DDL not allowed in this mode, Please check the value of @@ctc_concurrent_ddl."); return true; @@ -4189,6 +4202,9 @@ static int tse_check_tx_isolation() { } static int tse_create_db(THD *thd, handlerton *hton) { + if (engine_skip_ddl(thd)) { + return CT_SUCCESS; + } tianchi_handler_t tch; TSE_RETURN_IF_NOT_ZERO(get_tch_in_handler_data(hton, thd, tch)); @@ -4881,7 +4897,7 @@ EXTER_ATTACK int ha_tse::create(const char *name, TABLE *form, HA_CREATE_INFO *c thd->lex->alter_info->requested_algorithm = Alter_info::ALTER_TABLE_ALGORITHM_COPY; } - if (engine_ddl_passthru(thd)) { + if (engine_skip_ddl(thd) || engine_ddl_passthru(thd)) { return ret; } diff --git a/storage/tianchi/ha_tse.h b/storage/tianchi/ha_tse.h index 3dff5e8..72074e5 100644 --- a/storage/tianchi/ha_tse.h +++ b/storage/tianchi/ha_tse.h @@ -333,8 +333,8 @@ public: ulonglong table_flags() const override { DBUG_TRACE; return (HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE | HA_NULL_IN_KEY | HA_DESCENDING_INDEX | - HA_CAN_INDEX_VIRTUAL_GENERATED_COLUMN | HA_GENERATED_COLUMNS | HA_NO_READ_LOCAL_LOCK | - HA_SUPPORTS_DEFAULT_EXPRESSION | HA_CAN_INDEX_BLOBS | HA_CAN_EXPORT | HA_CAN_SQL_HANDLER | + HA_COUNT_ROWS_INSTANT | HA_GENERATED_COLUMNS | HA_NO_READ_LOCAL_LOCK | + HA_SUPPORTS_DEFAULT_EXPRESSION | HA_CAN_INDEX_BLOBS | HA_CAN_SQL_HANDLER | HA_ATTACHABLE_TRX_COMPATIBLE); } @@ -967,8 +967,8 @@ void tse_ddl_hook_cantian_error(const char *tag, THD *thd, ddl_ctrl_t *ddl_ctrl, ct_errno_t *ret); int tse_ddl_handle_fault(const char *tag, const THD *thd, const ddl_ctrl_t *ddl_ctrl, ct_errno_t ret, const char *param = nullptr, int fix_ret = 0); -bool ddl_enabled_local(MYSQL_THD thd); bool ddl_enabled_normal(MYSQL_THD thd); +bool engine_skip_ddl(MYSQL_THD thd); bool engine_ddl_passthru(MYSQL_THD thd); bool is_alter_table_copy(MYSQL_THD thd, const char *name = nullptr); bool is_alter_table_scan(bool m_error_if_not_empty); diff --git a/storage/tianchi/tse_ddl_rewriter_plugin.cc b/storage/tianchi/tse_ddl_rewriter_plugin.cc index 5470b10..089e73b 100644 --- a/storage/tianchi/tse_ddl_rewriter_plugin.cc +++ b/storage/tianchi/tse_ddl_rewriter_plugin.cc @@ -1084,6 +1084,10 @@ bool plugin_ddl_block(MYSQL_THD thd, } if (!IS_METADATA_NORMALIZATION()) { + if (engine_skip_ddl(thd)) { + tse_log_warning("[CTC_NOMETA_SQL]:record sql str only generate metadata. sql:%s", query_str.c_str()); + return false; + } // disallow ddl query if ctc_concurrent_ddl=OFF and tse_enable_ddl not set if (!ddl_enabled_normal(thd)) { my_printf_error(ER_DISALLOWED_OPERATION, "%s", MYF(0), "DDL not allowed in this mode, Please check the value of @@ctc_concurrent_ddl."); @@ -1185,7 +1189,7 @@ static int tse_check_metadata_switch() { static int tse_ddl_rewrite(MYSQL_THD thd, mysql_event_class_t event_class, const void *event) { - if(is_meta_version_initialize()) { + if (is_meta_version_initialize()) { return 0; } @@ -1198,7 +1202,7 @@ static int tse_ddl_rewrite(MYSQL_THD thd, mysql_event_class_t event_class, enum enum_sql_command sql_cmd = thd->lex->sql_command; auto it = ddl_cmds.find(sql_cmd); - bool need_forward = true; + bool need_forward = !engine_skip_ddl(thd); string query_str = string(event_parse->query.str).substr(0, event_parse->query.length); if (plugin_ddl_passthru(thd, it)) { diff --git a/storage/tianchi/tse_srv.h b/storage/tianchi/tse_srv.h index a9fde1e..4779a5e 100644 --- a/storage/tianchi/tse_srv.h +++ b/storage/tianchi/tse_srv.h @@ -75,18 +75,19 @@ typedef struct { uint32_t thd_id; uint32_t part_id; uint32_t subpart_id; + int64_t query_id; uint64_t sess_addr; uint64_t ctx_addr; uint64_t cursor_addr; + uint64_t pre_sess_addr; int32_t cursor_ref; - bool cursor_valid; uint16_t bind_core; uint8_t sql_command; - int64_t query_id; - uint8_t sql_stat_start; // TRUE when we start processing of an SQL statement + uint8_t sql_stat_start; // TRUE when we start processing of an SQL statement uint8_t change_data_capture; // TRUE when start logicrep in cantian + bool cursor_valid; bool is_broadcast; - uint64_t pre_sess_addr; + bool read_only_in_ct; // TRUE if read only in cantian: sys tables/views void* msg_buf; } tianchi_handler_t; -- Gitee