diff --git a/mysql-test/mysql-source-code-meta.patch b/mysql-test/mysql-source-code-meta.patch index bca87731e65301ba860e3a4e2906d8fa45aa08a6..5daca0ed6d492efc9d3c53e8690bd0bda375a282 100644 --- a/mysql-test/mysql-source-code-meta.patch +++ b/mysql-test/mysql-source-code-meta.patch @@ -109,6 +109,30 @@ index dbd6c18..4666753 100644 /** Remove table statistics entries from mysql.table_stats and +diff --git a/sql/dd/dd_resource_group.cc b/sql/dd/dd_resource_group.cc +index ab99cee..933b86d 100644 +--- a/sql/dd/dd_resource_group.cc ++++ b/sql/dd/dd_resource_group.cc +@@ -32,6 +32,7 @@ + #include "sql/sql_class.h" // THD + #include "sql/thd_raii.h" + #include "sql/transaction.h" // trans_commit ++#include "sql/mysqld.h" + + namespace dd { + +@@ -112,6 +113,11 @@ bool create_resource_group(THD *thd, + + bool update_resource_group(THD *thd, const String_type &resource_grp_name, + const resourcegroups::Resource_group &res_grp_ref) { ++ int cluster_role = tse_hton->get_cluster_role(); ++ if (cluster_role == 1) { ++ // Do not update resrouce_group table when starting on slave cluster. ++ return false; ++ } + DBUG_TRACE; + + dd::cache::Dictionary_client::Auto_releaser releaser(thd->dd_client()); diff --git a/sql/dd/dd_table.cc b/sql/dd/dd_table.cc index 7bf60f4..7be1743 100644 --- a/sql/dd/dd_table.cc @@ -156,10 +180,18 @@ index 7f9b9df..1d5575d 100644 LogErr(WARNING_LEVEL, ER_SKIP_UPDATING_METADATA_IN_SE_RO_MODE, schema_name_abbrev); diff --git a/sql/dd/impl/bootstrap/bootstrapper.cc b/sql/dd/impl/bootstrap/bootstrapper.cc -index d08718a..6440b0b 100644 +index d08718a..6f0d3ca 100644 --- a/sql/dd/impl/bootstrap/bootstrapper.cc +++ b/sql/dd/impl/bootstrap/bootstrapper.cc -@@ -81,7 +81,7 @@ namespace { +@@ -71,6 +71,7 @@ + #include "sql/mysqld.h" + #include "sql/sd_notify.h" // sysd::notify + #include "sql/thd_raii.h" ++#include "sql/mysqld.h" + + using namespace dd; + +@@ -81,7 +82,7 @@ namespace { // Initialize recovery in the DDSE. bool DDSE_dict_recover(THD *thd, dict_recovery_mode_t dict_recovery_mode, uint version) { @@ -168,7 +200,7 @@ index d08718a..6440b0b 100644 if (ddse->dict_recover == nullptr) return true; bool error = ddse->dict_recover(dict_recovery_mode, version); -@@ -266,6 +266,9 @@ bool acquire_exclusive_mdl(THD *thd) { +@@ -266,6 +267,9 @@ bool acquire_exclusive_mdl(THD *thd) { reset ID, store persistently, and update the storage adapter. */ bool flush_meta_data(THD *thd) { @@ -178,7 +210,7 @@ index d08718a..6440b0b 100644 // Acquire exclusive meta data locks for the relevant DD objects. if (acquire_exclusive_mdl(thd)) return true; -@@ -573,6 +576,9 @@ bool flush_meta_data(THD *thd) { +@@ -573,6 +577,9 @@ bool flush_meta_data(THD *thd) { if (persisted_dd_table != nullptr) delete persisted_dd_table; } @@ -188,7 +220,7 @@ index d08718a..6440b0b 100644 bootstrap::DD_bootstrap_ctx::instance().set_stage(bootstrap::Stage::SYNCED); return dd::end_transaction(thd, false); -@@ -580,6 +586,8 @@ bool flush_meta_data(THD *thd) { +@@ -580,6 +587,8 @@ bool flush_meta_data(THD *thd) { // Insert additional data into the DD tables. bool populate_tables(THD *thd) { @@ -197,7 +229,7 @@ index d08718a..6440b0b 100644 // Iterate over DD tables, populate tables. for (System_tables::Const_iterator it = System_tables::instance()->begin(); it != System_tables::instance()->end(); ++it) { -@@ -610,6 +618,9 @@ bool populate_tables(THD *thd) { +@@ -610,6 +619,9 @@ bool populate_tables(THD *thd) { bootstrap::DD_bootstrap_ctx::instance().set_stage( bootstrap::Stage::POPULATED); @@ -207,16 +239,21 @@ index d08718a..6440b0b 100644 return false; } -@@ -621,7 +632,7 @@ bool repopulate_charsets_and_collations(THD *thd) { +@@ -621,7 +633,12 @@ bool repopulate_charsets_and_collations(THD *thd) { to retrieve the handlerton for the DDSE should be replaced by a more generic mechanism. */ - handlerton *ddse = ha_resolve_by_legacy_type(thd, DB_TYPE_INNODB); ++ int cluster_role = tse_hton->get_cluster_role(); ++ if (cluster_role == 1) { ++ // Do not update the charset table in slave cluster. ++ return false; ++ } + handlerton *ddse = ha_resolve_by_legacy_type(thd, DB_TYPE_CTC); if (ddse->is_dict_readonly && ddse->is_dict_readonly()) { LogErr(WARNING_LEVEL, ER_DD_NO_WRITES_NO_REPOPULATION, "InnoDB", " "); return false; -@@ -724,7 +735,7 @@ namespace bootstrap { +@@ -724,7 +741,7 @@ namespace bootstrap { predefined tables and tablespaces. */ bool DDSE_dict_init(THD *thd, dict_init_mode_t dict_init_mode, uint version) { @@ -225,7 +262,7 @@ index d08718a..6440b0b 100644 /* The lists with element wrappers are mem root allocated. The wrapped -@@ -908,6 +919,10 @@ bool initialize(THD *thd) { +@@ -908,6 +925,10 @@ bool initialize(THD *thd) { // Normal server restart. bool restart(THD *thd) { @@ -236,7 +273,7 @@ index d08718a..6440b0b 100644 bootstrap::DD_bootstrap_ctx::instance().set_stage(bootstrap::Stage::STARTED); /* -@@ -946,7 +961,9 @@ bool restart(THD *thd) { +@@ -946,7 +967,9 @@ bool restart(THD *thd) { dd::execute_query(thd, "DROP SCHEMA schema_read_only") || dd::execute_query(thd, "CREATE TABLE IF NOT EXISTS S.restart(i INT)")) assert(false);); @@ -247,7 +284,7 @@ index d08718a..6440b0b 100644 bootstrap::DD_bootstrap_ctx::instance().set_stage(bootstrap::Stage::FINISHED); LogErr(INFORMATION_LEVEL, ER_DD_VERSION_FOUND, d->get_actual_dd_version(thd)); -@@ -1502,7 +1519,7 @@ bool sync_meta_data(THD *thd) { +@@ -1502,7 +1525,7 @@ bool sync_meta_data(THD *thd) { return true; // Reset the DDSE local dictionary cache. @@ -256,7 +293,7 @@ index d08718a..6440b0b 100644 if (ddse->dict_cache_reset == nullptr) return true; for (System_tables::Const_iterator it = System_tables::instance()->begin(); -@@ -1797,7 +1814,7 @@ bool update_versions(THD *thd, bool is_dd_upgrade_57) { +@@ -1797,7 +1820,7 @@ bool update_versions(THD *thd, bool is_dd_upgrade_57) { back in case of an abort, so this better be the last step we do before committing. */ @@ -973,7 +1010,7 @@ index 217a0c1..fe900eb 100644 if (unlikely(error)) return error; diff --git a/sql/handler.h b/sql/handler.h -index 0530ca1..214027d 100644 +index 0530ca1..bab9b08 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -662,7 +662,8 @@ enum legacy_db_type { @@ -1004,7 +1041,7 @@ index 0530ca1..214027d 100644 enum_binlog_command binlog_command, const char *query, uint query_length, const char *db, const char *table_name); -@@ -2133,6 +2134,13 @@ typedef bool (*check_fk_column_compat_t)( +@@ -2133,6 +2134,14 @@ typedef bool (*check_fk_column_compat_t)( typedef bool (*is_reserved_db_name_t)(handlerton *hton, const char *name); @@ -1014,11 +1051,12 @@ index 0530ca1..214027d 100644 +typedef int (*get_metadata_status_t)(); +typedef int (*op_before_load_meta_t)(THD *thd); +typedef int (*op_after_load_meta_t)(THD *thd); ++typedef int (*get_cluster_role_t)(); + /** Prepare the secondary engine for executing a statement. This function is called right after the secondary engine TABLE objects have been opened by -@@ -2420,6 +2428,8 @@ struct handlerton { +@@ -2420,6 +2429,8 @@ struct handlerton { */ uint savepoint_offset; @@ -1027,7 +1065,7 @@ index 0530ca1..214027d 100644 /* handlerton methods */ close_connection_t close_connection; -@@ -2464,6 +2474,14 @@ struct handlerton { +@@ -2464,6 +2475,15 @@ struct handlerton { dict_set_server_version_t dict_set_server_version; is_reserved_db_name_t is_reserved_db_name; @@ -1038,11 +1076,12 @@ index 0530ca1..214027d 100644 + get_metadata_status_t get_metadata_status; + op_before_load_meta_t op_before_load_meta; + op_after_load_meta_t op_after_load_meta; ++ get_cluster_role_t get_cluster_role; + /** Global handler flags. */ uint32 flags{0}; -@@ -4482,7 +4500,7 @@ class handler { +@@ -4482,7 +4502,7 @@ class handler { and delete_row() below. */ int ha_external_lock(THD *thd, int lock_type); @@ -1051,7 +1090,7 @@ index 0530ca1..214027d 100644 /** Update the current row. -@@ -6161,12 +6179,13 @@ class handler { +@@ -6161,12 +6181,13 @@ class handler { } @param buf Buffer to write from. @@ -1066,7 +1105,7 @@ index 0530ca1..214027d 100644 return HA_ERR_WRONG_COMMAND; } -@@ -6178,6 +6197,7 @@ class handler { +@@ -6178,6 +6199,7 @@ class handler { the columns required for the error message are not read, the error message will contain garbage. */ @@ -1074,7 +1113,7 @@ index 0530ca1..214027d 100644 virtual int update_row(const uchar *old_data MY_ATTRIBUTE((unused)), uchar *new_data MY_ATTRIBUTE((unused))) { return HA_ERR_WRONG_COMMAND; -@@ -6874,7 +6894,7 @@ void ha_pre_dd_shutdown(void); +@@ -6874,7 +6896,7 @@ void ha_pre_dd_shutdown(void); @retval true Error */ bool ha_flush_logs(bool binlog_group_flush = false); @@ -1083,7 +1122,7 @@ index 0530ca1..214027d 100644 int ha_create_table(THD *thd, const char *path, const char *db, const char *table_name, HA_CREATE_INFO *create_info, bool update_create_info, bool is_temp_table, -@@ -6973,7 +6993,7 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht, +@@ -6973,7 +6995,7 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht, int ha_reset_logs(THD *thd); int ha_binlog_index_purge_file(THD *thd, const char *file); void ha_reset_slave(THD *thd); diff --git a/storage/tianchi/ha_tse.cc b/storage/tianchi/ha_tse.cc index 43450a9792a1362a5fdcaa9cb7a6c240ecc0a345..44f04b958eb34337833f613634e9b274e1e80f8f 100644 --- a/storage/tianchi/ha_tse.cc +++ b/storage/tianchi/ha_tse.cc @@ -121,6 +121,8 @@ #include "sql/sql_table.h" #include "sql/mysqld_thd_manager.h" #include "sql/sql_backup_lock.h" +#include "ctc_meta_data.h" +#include "sql/mysqld.h" //------------------------------------------------------------------------------// // SYSTEM VARIABLES // @@ -2759,9 +2761,8 @@ EXTER_ATTACK int ha_tse::update_row(const uchar *old_data, uchar *new_data) { if (!flag.no_foreign_key_check) { flag.no_cascade_check = flag.dd_update ? true : pre_check_for_cascade(true); } - bool is_mysqld_starting = is_starting(); ret = (ct_errno_t)tse_update_row(&m_tch, cantian_new_record_buf_size, tse_buf, - &upd_fields[0], upd_fields.size(), flag, &is_mysqld_starting); + &upd_fields[0], upd_fields.size(), flag); check_error_code_to_mysql(thd, &ret); // 如果m_tse_buf为空,说明tse_buf是动态申请的,在函数退出之前要释放掉 if (m_tse_buf == nullptr) { @@ -3974,27 +3975,6 @@ THR_LOCK_DATA **ha_tse::store_lock(THD *, THR_LOCK_DATA **to, return to; } - -int tse_query_cluster_role(bool *is_slave, bool *cantian_cluster_ready) { - void *shm_inst = get_one_shm_inst(NULL); - query_cluster_role_request *req = (query_cluster_role_request*) alloc_share_mem(shm_inst, sizeof(query_cluster_role_request)); - DBUG_EXECUTE_IF("check_init_shm_oom", { req = NULL; }); - if (req == NULL) { - tse_log_error("alloc shm mem error, shm_inst(%p), size(%lu)", shm_inst, sizeof(query_cluster_role_request)); - return ERR_ALLOC_MEMORY; - } - - int result = ERR_CONNECTION_FAILED; - int ret = tse_mq_deal_func(shm_inst, TSE_FUNC_QUERY_CLUSTER_ROLE, req, nullptr); - if (ret == CT_SUCCESS) { - result = req->result; - *is_slave = req->is_slave; - *cantian_cluster_ready = req->cluster_ready; - } - free_share_mem(shm_inst, req); - - return result; -} int32_t tse_get_cluster_role() { if (cluster_role != (int32_t)dis_cluster_role::DEFAULT) { @@ -4270,8 +4250,34 @@ int tse_set_cluster_role_by_cantian(bool is_slave) { // todo: add mutex for cluster_role if (is_slave) { cluster_role = (int32_t)dis_cluster_role::STANDBY; + if(is_starting() || is_initialize()) { + tse_log_system("[Disaster Recovecy] starting or initializing"); + super_read_only = true; + read_only = true; + opt_readonly = true; + tse_log_system("[Disaster Recovery] set super_read_only = true."); + } else { + tse_ddl_broadcast_request local_req {{0}, {0}, {0}, {0}, 0, 0, 0, 0, {0}}; + memcpy(local_req.user_name, "super_read_only", strlen("super_read_only")); + memcpy(local_req.user_ip, "on", strlen("on")); + ctc_set_sys_var(&local_req); + tse_log_system("[Disaster Recovery] ctc_set_sys_var: local_req->user_ip: %s", local_req.user_ip); + } } else { cluster_role = (int32_t)dis_cluster_role::PRIMARY; + if(is_starting() || is_initialize()) { + tse_log_system("[Disaster Recovecy] starting or initializing"); + super_read_only = false; + read_only = false; + opt_readonly = false; + tse_log_system("[Disaster Recovery] set super_read_only = false."); + } else { + tse_ddl_broadcast_request local_req {{0}, {0}, {0}, {0}, 0, 0, 0, 0, {0}}; + memcpy(local_req.user_name, "super_read_only", strlen("super_read_only")); + memcpy(local_req.user_ip, "off", strlen("off")); + ctc_set_sys_var(&local_req); + tse_log_system("[Disaster Recovery] ctc_set_sys_var: local_req->user_ip: %s", local_req.user_ip); + } } return 0; } @@ -4552,6 +4558,7 @@ static typename std::enable_if::type set_hton_ tse_hton->op_after_load_meta = tse_op_after_load_meta; tse_hton->drop_database = tse_drop_database_with_err; tse_hton->binlog_log_query = tse_binlog_log_query_with_err; + tse_hton->get_cluster_role = tse_get_cluster_role; } template diff --git a/storage/tianchi/ha_tse.h b/storage/tianchi/ha_tse.h index 72074e585476de25d7dd01cb3aa34671bbf3064c..d349c205f14a3f8ded6dff03a7a560b7d43ad6b3 100644 --- a/storage/tianchi/ha_tse.h +++ b/storage/tianchi/ha_tse.h @@ -143,6 +143,8 @@ again. */ } #define IS_METADATA_NORMALIZATION() (tse_get_metadata_switch() == (int32_t)metadata_switchs::MATCH_META) +#define IS_PRIMARY_ROLE() (tse_get_cluster_role() == (int32_t)dis_cluster_role::PRIMARY) +#define IS_STANDBY_ROLE() (tse_get_cluster_role() == (int32_t)dis_cluster_role::STANDBY) static const uint ROW_ID_LENGTH = sizeof(uint64_t); static const uint TSE_START_TIMEOUT = 120; // seconds diff --git a/storage/tianchi/srv_mq_msg.h b/storage/tianchi/srv_mq_msg.h index 657d6527ad41444f62e6c9e1ff5607d9a3907631..06d771e645a5d6756d60e65d666b2b5f371e4112 100644 --- a/storage/tianchi/srv_mq_msg.h +++ b/storage/tianchi/srv_mq_msg.h @@ -90,7 +90,6 @@ struct update_row_request { uint16_t col_num; int result; dml_flag_t flag; - bool is_mysqld_starting; }; struct delete_row_request { @@ -245,7 +244,6 @@ struct index_read_request { tse_select_mode_t mode; tse_conds *cond; bool is_replace; - bool is_mysqld_starting; }; struct index_end_request { diff --git a/storage/tianchi/tse_srv.h b/storage/tianchi/tse_srv.h index 4779a5e0bc0c047e579457d6dd17b0550d2302b5..c0edada345d6cd879aebbc05fff56ef7698de33d 100644 --- a/storage/tianchi/tse_srv.h +++ b/storage/tianchi/tse_srv.h @@ -517,7 +517,7 @@ int tse_write_through_row(tianchi_handler_t *tch, const record_info_t *record_in int tse_bulk_write(tianchi_handler_t *tch, const record_info_t *record_info, uint64_t rec_num, uint32_t *err_pos, dml_flag_t flag, ctc_part_t *part_ids); int tse_update_row(tianchi_handler_t *tch, uint16_t new_record_len, const uint8_t *new_record, - const uint16_t *upd_cols, uint16_t col_num, dml_flag_t flag, bool *is_mysqld_starting); + const uint16_t *upd_cols, uint16_t col_num, dml_flag_t flag); int tse_delete_row(tianchi_handler_t *tch, uint16_t record_len, dml_flag_t flag); int tse_rnd_init(tianchi_handler_t *tch, expected_cursor_action_t action, tse_select_mode_t mode, tse_conds *cond); @@ -607,6 +607,7 @@ int tse_update_mysql_dd_cache(char *sql_str); int tse_set_cluster_role_by_cantian(bool is_slave); int ctc_record_sql_for_cantian(tianchi_handler_t *tch, tse_ddl_broadcast_request *broadcast_req, bool allow_fail); +int tse_query_cluster_role(bool *is_slave, bool *cantian_cluster_ready); #ifdef __cplusplus } #endif diff --git a/storage/tianchi/tse_srv_mq_stub.cc b/storage/tianchi/tse_srv_mq_stub.cc index 310fb47c3de703605b89838608d76cca57bd086e..6aaaa24740e4d14a2bfac8a5dc487001e420689d 100644 --- a/storage/tianchi/tse_srv_mq_stub.cc +++ b/storage/tianchi/tse_srv_mq_stub.cc @@ -35,7 +35,7 @@ } while (0) // 双进程模式在 tse_init 中已经提前获取 inst_id -int tse_alloc_inst_id(uint32_t *inst_id) { +int tse_alloc_inst_id(uint32_t *inst_id) { *inst_id = ha_tse_get_inst_id(); return tse_mq_register_func(); } @@ -189,7 +189,7 @@ int tse_bulk_write(tianchi_handler_t *tch, const record_info_t *record_info, uin } int tse_update_row(tianchi_handler_t *tch, uint16_t new_record_len, const uint8_t *new_record, - const uint16_t *upd_cols, uint16_t col_num, dml_flag_t flag, bool *is_mysqld_starting) { + const uint16_t *upd_cols, uint16_t col_num, dml_flag_t flag) { assert(new_record_len < BIG_RECORD_SIZE); assert(col_num <= TSE_MAX_COLUMNS); void *shm_inst = get_one_shm_inst(tch); @@ -203,15 +203,7 @@ int tse_update_row(tianchi_handler_t *tch, uint16_t new_record_len, const uint8_ req->col_num = col_num; req->new_record = const_cast(new_record); req->flag = flag; - req->is_mysqld_starting = *is_mysqld_starting; - /* - The MySQL would try to update tables in starting progress: - 1. mysql.charset 2. mysql.resource_groups - If the Cantian is in slave-cluster (read-only) this would obviously cause error. - So we should not allow the MySQL update tables in strating progress on a slave-cantian-cluster. - The newly added flag is to inform cantian, the mysqld is starting . - Cantian would return CT_SUCCESS when it is in slave-cluster and mysqld is starting, - */ + memcpy(req->upd_cols, upd_cols, sizeof(uint16_t) * col_num); int result = ERR_CONNECTION_FAILED; int ret = tse_mq_deal_func(shm_inst, TSE_FUNC_TYPE_UPDATE_ROW, req, tch->msg_buf); @@ -522,7 +514,6 @@ int tse_index_read(tianchi_handler_t *tch, record_info_t *record_info, index_key req->cond = cond; req->is_replace = is_replace; req->result = 0; - req->is_mysqld_starting = is_starting(); int result = ERR_CONNECTION_FAILED; int ret = tse_mq_deal_func(shm_inst, TSE_FUNC_TYPE_INDEX_READ, req, tch->msg_buf); @@ -1466,3 +1457,25 @@ int ctc_record_sql_for_cantian(tianchi_handler_t *tch, tse_ddl_broadcast_request free_share_mem(shm_inst, req); return result; } + +int tse_query_cluster_role(bool *is_slave, bool *cantian_cluster_ready) { + void *shm_inst = get_one_shm_inst(NULL); + query_cluster_role_request *req = (query_cluster_role_request*) alloc_share_mem(shm_inst, sizeof(query_cluster_role_request)); + DBUG_EXECUTE_IF("check_init_shm_oom", { req = NULL; }); + if (req == NULL) { + tse_log_error("alloc shm mem error, shm_inst(%p), size(%lu)", shm_inst, sizeof(query_cluster_role_request)); + return ERR_ALLOC_MEMORY; + } + + int result = ERR_CONNECTION_FAILED; + int ret = tse_mq_deal_func(shm_inst, TSE_FUNC_QUERY_CLUSTER_ROLE, req, nullptr); + if (ret == CT_SUCCESS) { + result = req->result; + *is_slave = req->is_slave; + *cantian_cluster_ready = req->cluster_ready; + } + free_share_mem(shm_inst, req); + tse_log_system("[Disaster Recovery] is_slave: %d", *is_slave); + + return result; +}