您好,登錄后才能下訂單哦!
將MySQL數(shù)據(jù)實(shí)時(shí)同步到Redis緩存可以提高應(yīng)用程序的性能和響應(yīng)速度。以下是實(shí)現(xiàn)這一目標(biāo)的幾種常見(jiàn)方法:
消息隊(duì)列是一種異步處理機(jī)制,可以用來(lái)解耦MySQL和Redis之間的數(shù)據(jù)同步。
# 生產(chǎn)者腳本
import mysql.connector
import pika
def send_to_queue(data):
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='mysql_to_redis')
channel.basic_publish(exchange='', routing_key='mysql_to_redis', body=data)
connection.close()
def fetch_data_from_mysql():
cnx = mysql.connector.connect(user='user', password='password', host='127.0.0.1', database='mydatabase')
cursor = cnx.cursor()
cursor.execute("SELECT * FROM mytable")
data = cursor.fetchall()
for row in data:
send_to_queue(row)
cursor.close()
cnx.close()
if __name__ == "__main__":
fetch_data_from_mysql()
# 消費(fèi)者腳本
import pika
import redis
def callback(ch, method, properties, body):
data = body.decode('utf-8')
r = redis.Redis(host='localhost', port=6379, db=0)
r.set(data['id'], data)
def consume_from_queue():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='mysql_to_redis')
channel.basic_consume(queue='mysql_to_redis', on_message_callback=callback, auto_ack=True)
print('Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == "__main__":
consume_from_queue()
通過(guò)在MySQL中設(shè)置觸發(fā)器和日志表,可以在數(shù)據(jù)變更時(shí)自動(dòng)記錄變更信息,然后將這些信息同步到Redis。
# 創(chuàng)建日志表
import mysql.connector
def create_log_table():
cnx = mysql.connector.connect(user='user', password='password', host='127.0.0.1', database='mydatabase')
cursor = cnx.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS log_table (
id INT AUTO_INCREMENT PRIMARY KEY,
table_name VARCHAR(255),
action VARCHAR(255),
old_data TEXT,
new_data TEXT,
changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.close()
cnx.close()
# 創(chuàng)建觸發(fā)器
def create_trigger():
import mysql.connector
cnx = mysql.connector.connect(user='user', password='password', host='127.0.0.1', database='mydatabase')
cursor = cnx.cursor()
cursor.execute("""
CREATE TRIGGER after_insert_mytable
AFTER INSERT ON mytable
FOR EACH ROW
INSERT INTO log_table (table_name, action, old_data, new_data)
VALUES ('mytable', 'INSERT', NULL, JSON_OBJECT('id', NEW.id, 'name', NEW.name));
""")
cursor.execute("""
CREATE TRIGGER after_update_mytable
AFTER UPDATE ON mytable
FOR EACH ROW
INSERT INTO log_table (table_name, action, old_data, new_data)
VALUES ('mytable', 'UPDATE', JSON_OBJECT('id', OLD.id, 'name', OLD.name), JSON_OBJECT('id', NEW.id, 'name', NEW.name));
""")
cursor.execute("""
CREATE TRIGGER after_delete_mytable
AFTER DELETE ON mytable
FOR EACH ROW
INSERT INTO log_table (table_name, action, old_data, new_data)
VALUES ('mytable', 'DELETE', JSON_OBJECT('id', OLD.id, 'name', OLD.name), NULL);
""")
cursor.close()
cnx.close()
# 同步腳本
import mysql.connector
import redis
import json
def sync_from_log():
cnx = mysql.connector.connect(user='user', password='password', host='127.0.0.1', database='mydatabase')
cursor = cnx.cursor()
cursor.execute("SELECT * FROM log_table")
data = cursor.fetchall()
for row in data:
key = f"mytable:{row[0]}:{row[1]}:{row[2]}"
value = json.loads(row[3]) if row[3] else row[4]
r = redis.Redis(host='localhost', port=6379, db=0)
r.set(key, json.dumps(value))
cursor.close()
cnx.close()
if __name__ == "__main__":
create_log_table()
create_trigger()
sync_from_log()
有一些第三方工具可以幫助實(shí)現(xiàn)MySQL到Redis的實(shí)時(shí)同步,例如:
以上方法各有優(yōu)缺點(diǎn),選擇哪種方法取決于具體的應(yīng)用場(chǎng)景和需求。消息隊(duì)列方法可以實(shí)現(xiàn)高效的異步處理,數(shù)據(jù)庫(kù)觸發(fā)器和日志表方法可以實(shí)現(xiàn)精確的數(shù)據(jù)同步,而第三方工具則提供了簡(jiǎn)單快捷的解決方案。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。