clickhouse从MySQL同步数据 mysql clickhouse



1.MySQL引擎

MySQL引擎用于将远程的MySQL服务器中的表映射到clickhouse中,并允许对表进行insert和select操作,以方便在clickhouse与MySQL之间进行数据交换.

MySQL数据库引擎会将对其的查询转换为MySQL语法并发送到MySQL服务器中,因此以执行诸如SHOW TABLES 或 SHOW CREATE TABLE之类的操作.

但无法对其执行以下操作:

ATTACH/DETACH

DROP

RENAME

CREATE TABLE

ALTER

CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster] ENGINE = MySQL('host:port','database','user','password')
PS:该database就是MySQL的database的映射,MySQL中的数据发生变化,clickhouse中的数据也会跟着变化.


2.单次单表同步

此为将数据从MySQL的某表中全量同步到clickhouse中,clickhouse中的数据不会随MySQL中数据的变化而变化.

CREATE TABLE tmp ENGINE = MergeTree ORDER BY id AS SELECT * FROM mysql('hostip:3306','db','table','user','passwd');


3.定时增量同步

mport pymysql
from clickhouse_driver import Client
click_cllient = Client("host","port","db","user","passwd")
mysql_client = pymysql.connect(host='***',port='***',user='***',passwd='***',db='***',charset='utf8')
def get_id():
	click_sql = """select id from test order by id desc limit 1"""
	try:
		list = click_client.execute(click_sql,types_check = True)
		for i in list:
			id = i[0]
			return id
	except Exception as e:
		print(e)
def get_data():
	log_id = get_id()
	cursor = mysql_cliet.cursor()
	sql = """select * from test where id > '%s'"""%log_id
	cursor.execute(sql)
	results = cursor.fetchall()
	mysql_client.close()
	return results
def insert_data(data):
	try:
		click_client.execute('insert into test values',[data],types_check=True)
		return "success!!!"
	except Exception e:
		return(e)
def main():
	print(get_id())
	list = get_data()
	for data in list:
		print(data)
		print(insert_data(data))
if __name__ == '__main__':
	main()


签名:这个人很懒,什么也没有留下!
最新回复 (0)
返回