溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Python怎么定時(shí)從Mysql提取數(shù)據(jù)存入Redis

發(fā)布時(shí)間:2020-07-29 09:50:15 來源:億速云 閱讀:335 作者:小豬 欄目:開發(fā)技術(shù)

這篇文章主要講解了Python怎么定時(shí)從Mysql提取數(shù)據(jù)存入Redis,內(nèi)容清晰明了,對(duì)此有興趣的小伙伴可以學(xué)習(xí)一下,相信大家閱讀完之后會(huì)有幫助。

設(shè)計(jì)思路:

1.程序一旦run起來,python會(huì)把mysql中最近一段時(shí)間的數(shù)據(jù)全部提取出來

2.然后實(shí)例化redis類,將數(shù)據(jù)簡單解析后逐條傳入redis隊(duì)列

3.定時(shí)器設(shè)計(jì)每天凌晨12點(diǎn)開始跑

ps:redis是個(gè)內(nèi)存數(shù)據(jù)庫,做后臺(tái)消息隊(duì)列的緩存時(shí)有很大的用處,有興趣的小伙伴可以去查看相關(guān)的文檔。

 # -*- coding:utf-8 -*- 

import MySQLdb
import schedule
import time
import datetime
import random
import string
import redis

# get the data from mysql
class FromSql(object):
  def __init__(self, conn):
    self.conn = conn

  def acquire(self):
    cursor = self.conn.cursor()
    try:
      sql = "SELECT * FROM test WHERE TO_DAYS(NOW()) - TO_DAYS(t) <= 1"

      cursor.execute(sql)
      rs = cursor.fetchall()
      #print (rs)
      for eve in rs:

        print('%s, %s, %s, %s' % eve)
      copy_rs = rs
      cursor.close()

      return copy_rs 

    except Exception as e:
      print("The error: %s" % e)


class RedisQueue(object):

  def __init__(self, name, namespace='queue', **redis_kwargs):
    """The default connection parameters are: host='localhost', port=6379, db=0"""
    self.__db= redis.Redis(**redis_kwargs)
    self.key = '%s:%s' %(namespace, name)

  def qsize(self):
    return self.__db.llen(self.key)

  def put(self, item):
    self.__db.rpush(self.key, item)

  def get(self, block=True, timeout=None):

    if block:
      item = self.__db.blpop(self.key, timeout=timeout)
    else:
      item = self.__db.lpop(self.key)

    if item:
      item = item[1]
    return item

  def get_nowait(self):
    return self.get(False)


if __name__ == "__main__":
  # connect mysqldb
  conn_sql = MySQLdb.connect(
            host = '127.0.0.1',
            port = 3306,
            user = 'root',
            passwd = '',
            db = 'test',
            charset = 'utf8'
            )


def job_for_redis():
    get_data = FromSql(conn_sql)
    data = get_data.acquire()

    q = RedisQueue('test',host='localhost', port=6379, db=0)
    for single_data in data:
      for meta_data in single_data:
        q.put(meta_data)
        print(meta_data)
    print("All data had been inserted.") 

"""
  try:
    schedule.every().day.at("00:00").do(job_for_redis)
  except Exception as e:
    print('Error: %s'% e)
#  finally:
#    conn.close()

  while True:
    schedule.run_pending()
    time.sleep(1)
"""

補(bǔ)充知識(shí):python定時(shí)獲取匯率存入數(shù)據(jù)庫

python定時(shí)任務(wù):

我們可以使用 輕量級(jí)的第三方模塊schedule。首先先安裝:pip install schedule

定時(shí)任務(wù)的的小測(cè)試:

import schedule
import time
 
def job():
  print("I'm working...")
 
schedule.every(10).minutes.do(job)       # 每隔10分鐘執(zhí)行一次任務(wù)
schedule.every().hour.do(job)          # 每隔一小時(shí)執(zhí)行一次任務(wù)
schedule.every().day.at("10:30").do(job)    # 每天10:30執(zhí)行一次任務(wù)
schedule.every(5).to(10).days.do(job)      # 每5-10天執(zhí)行一次任務(wù)
schedule.every().monday.do(job)         # 每周一的這個(gè)時(shí)候執(zhí)行一次任務(wù)
schedule.every().wednesday.at("13:15").do(job) # 每周三13:15執(zhí)行一次任務(wù)
 
while True:
  schedule.run_pending()

獲取數(shù)據(jù)存入數(shù)據(jù)庫:(格式可能不太對(duì),還有一些符號(hào)。自己修改一下即可)

import pymysql
import schedule
import time
import requests
import pandas
from sqlalchemy import create_engine

#獲取美元的所有外匯
def job():
  content = '美元'
  url = 'http://www.boc.cn/sourcedb/whpj/index.html' #外匯數(shù)據(jù)地址
  html = requests.get(url).content.decode('utf-8')

  index = html.index('<td>' + content + '</td>')
  str = html[index:index+300]
  result = re.findall('<td>(.*&#63;)</td>',str)

  print("幣種:" + result[0])
  print("現(xiàn)匯買入價(jià):" + result[1])
  print("現(xiàn)鈔買入價(jià):" + result[2])
  print("現(xiàn)匯賣出價(jià):" + result[3])
  print("現(xiàn)鈔賣出價(jià):" + result[4])
  print("中行結(jié)算價(jià):" + result[5])
  print("發(fā)布時(shí)間:" + result[6] + ' ' + result[7])
  
 #本地地址 數(shù)據(jù)庫賬號(hào) 密碼  數(shù)據(jù)庫名
  db = pymysql.connect('localhost','root','root','pinyougoudb')
  cursor = db.cursor()
  
 #sql語句
  sql = "update tb_money set huiBuy = %s,chaoBuy = %s,huiSale = %s,chaoSale = %s,centerResult= %s,publishTime = '%s' where typeId = '%s'" % (result[1], result[2], result[3], result[4], result[5], result[6] + ' ' + result[7], result[0])

  cursor.execute(sql)
  db.commit()
  print('success')

 # 查詢語句,將存入的數(shù)據(jù)查出來
  # sqlalchemy 進(jìn)行數(shù)據(jù)庫初始化
  engine = create_engine('mysql+pymysql://root:root@localhost:3306/pinyougoudb')
  sql = '''select * from tb_money'''

  # pandas 進(jìn)行數(shù)據(jù)庫讀寫
  df = pandas.read_sql_query(sql,engine)
  print(df)

  db.commit()


# 每隔幾分中刷新一次
#schedule.every(0.1).minutes.do(job)

#每天什么時(shí)候刷新
schedule.every().day.at("09:29").do(job)
schedule.every().day.at("09:30").do(job)

#一直循環(huán) 知道滿足條件執(zhí)行
while True:
  schedule.run_pending()

看完上述內(nèi)容,是不是對(duì)Python怎么定時(shí)從Mysql提取數(shù)據(jù)存入Redis有進(jìn)一步的了解,如果還想學(xué)習(xí)更多內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI