让代码更简单

[Python源码]股票价格监听并推送

重要:本文最后更新于2023-08-23 21:55:52,某些文章具有时效性,若有错误或已失效,请在下方留言或联系代码狗

最近比较忙,没时间看股票,为了能在预期价格买入,我专门写了个监听程序,当股票价格达到预期价格时,将会推送消息到微信,我使用的是自建企业微信,所以不公开此部分代码,改成你自己的就行了。

我是个佛系股民,所以适合这种方式,如果你需要经常操作,不建议使用。

python源码如下,使用3.7版本,股票数据使用的XX财经,不建议使用baostock,数据不及时,建议5分钟执行一次。

复制
import aiohttp,asyncio,json,time,re,os,datetime

class StockListen:
    def __init__(self):
        #定义需要监听的股票代码列表
        self.stock_list = ['1.600050','1.601988','1.601288','1.601939']
        #定义预期价格列表
        self.expect_price = [6.6,3.0,2.7,5]

        #定义数据接口api地址
        self.api_url = 'https://push2his.eastmoney.com/api/qt/stock/kline/get'
        #定义推送地址
        self.wechat_url='你的推送地址'
        #初始化异步请求对象
        self.session = aiohttp.ClientSession()
        #定义本地记录文件路径stock.conf
        self.stock_conf = 'stock.conf'

    async def get_data(self,stock_code):
        #定义请求头
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36',
            'referer':'https://quote.eastmoney.com'
        }
        #获取当前时间戳
        now_time = int(time.time()*1000)
        #定义请求参数
        params = {
            'cb': 'jQuery351010027202936442903_1692684817016',
            'secid': stock_code,
            'ut': 'fa5fd1943c7b386f172d6893dbfba10b',
            'fields1': 'f1,f2,f3,f4,f5,f6',
            'fields2': 'f51,f52,f53,f54,f55',
            'klt': '101',
            'fqt': '1',
            'end': '20500101',
            'lmt': '1',
            '_': now_time
        }
        #发起异步请求
        async with self.session.get(self.api_url, params=params, headers=headers) as resp:
            #获取响应内容
            data = await resp.text()
            #正则匹配提取jQuery351010027202936442903_1692684817016()中的内容
            data = re.findall('jQuery351010027202936442903_1692684817016\((.*)\);', data)[0]
            #将数据转换为json格式
            data_json = json.loads(data)
            if data_json['data'] is not None:
                return data_json['data']
            else:
                return None
    #企业微信异步推送,这里可以换成你自己的
    async def send_wechat(self,title,content):
        #准备请求数据
        curtime = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
        data = "你的数据"
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36',
            'Content-Type': 'application/json'
        }
        #发送请求
        async with self.session.post(url=self.wechat_url,data=json.dumps(data),headers=headers) as response:
            #获取响应数据
            response = await response.text()
            #返回响应数据
            return response
    #遍历股票代码列表
    async def get_stock_code(self):
        #遍历股票代码列表self.stock_list并通过索引下标获取对应价格self.expect_price
        for i in range(len(self.stock_list)):
            time.sleep(5)
            #获取股票代码
            stock_code = self.stock_list[i]
            #获取股票价格
            expect_price = self.expect_price[i]
            #通过get_data方法获得股票数据
            data = await self.get_data(stock_code)
            if data is None:
                return {"code":100,"msg":"数据获取失败"}
            #判断股票价格是否低于预期
            print(data['klines'][0])
            #,分割数据
            klines = data['klines'][0].split(',')
            if float(klines[2]) < float(expect_price):
                #判断今日是否已经推送过
                lastrecord = await self.get_stock_push_time(stock_code)
                if  lastrecord is None:
                    #通过send_wechat方法发送消息
                    response = await self.send_wechat(data['name']+stock_code,klines[2])
                    await self.record_stock_push_time(stock_code)
                    print(response)
                else:
                    current_time = datetime.datetime.now().strftime("%Y-%m-%d")
                    if lastrecord != current_time:
                        #通过send_wechat方法发送消息
                        response = await self.send_wechat(data['name']+stock_code,klines[2])
                        await self.record_stock_push_time(stock_code)
                        print(response)
                    else:
                        print("今日已推送")
            else:
                #如果高于预期则不发送消息
                print(stock_code+'价格未达到预期')

           
    
    async def get_stock_push_time(self,stock_code):
        # 检查文件是否存在
        if os.path.exists("stock.conf"):
            with open("stock.conf", "r") as file:
                lines = file.readlines()
                for line in lines:
                    if line.startswith(stock_code + ","):
                        return line.split(",")[1].strip()
        return None
    
    #记录推送时间到本地文件stock.conf
    async def record_stock_push_time(self,stock_code):
        current_time = datetime.datetime.now().strftime("%Y-%m-%d")
        record = f"{stock_code},{current_time}"

        # 检查文件是否存在
        if os.path.exists("stock.conf"):
            with open("stock.conf", "r") as file:
                lines = file.readlines()
        else:
            lines = []

        # 更新股票推送时间
        found_stock = False
        for i in range(len(lines)):
            if lines[i].startswith(stock_code + ","):
                lines[i] = record + "\n"
                found_stock = True
                break

        # 如果股票代码不存在,则添加新记录
        if not found_stock:
            lines.append(record + "\n")

        # 将记录写入文件
        with open("stock.conf", "w") as file:
            file.writelines(lines)
    

#单例测试
if __name__ == '__main__':
    
    async def ansyc_test():
        #获取股票代码
        stock_code = '1.600050'
        #实例化对象
        eastmoney_stock_data =StockListen()
        #发起异步请求
        data=await eastmoney_stock_data.get_stock_code()
        await eastmoney_stock_data.session.close()
        return data
    result=asyncio.get_event_loop().run_until_complete(ansyc_test())
    print(result)

将上面的代码放在服务器上,再配合宝塔面板等定时任务,即可实现股票购买价位推送。

感觉很棒!可以赞赏支持我哟~

0 打赏

评论 (0)

登录后评论
QQ咨询 邮件咨询 狗哥推荐