PyFlink 流批一体以及 BTC.com 在区块链领域的专业实践
概要:
大家好,我们是 BTC.com 团队。 2020 年,我们有幸接触到了 Flink 和 PyFlink 生态,从团队自身需求出发,完善了团队内实时计算的任务和需求,搭建了流批一体的计算环境。
在实现实时计算的过程中,我们在实践中收获了一些经验,在此分享一些这方面的心路历程。
0x01 TOC
- 困惑 • 描述 • 思考 • 行动
- 流批一体的架构
- 架构
- 效果
- zeppelin,PyFlink on k8s 等实践
- zeppelin
- PyFlink on k8s
- 区块链领域实践
- 展望 • 总结
0x02 困惑 • 描述 • 思考 • 行动
作为工程师,我们每天都在不断地了解需求,研发业务。
有一天,我们被拉到了一次团队总结会议上,收到了以下的需求:
销售总监 A:
我们想要知道销售的 历史和实时转化率、销售额,能不能统计一下实时的 TOP5 的商品,还有就是大促时候,用户实时访问、商品实时浏览量 TOP5 的情况呢,可以根据他历史访问的记录实时推荐相关的吗。
市场总监 B:
我们想要知道市场推广的效果,每次活动的实时数据,不然我们的市场投放无法准确评估效果,及时反馈啊。
研发总监 C:
有些用户的 bug 无法复现,日志可以再实时一点吗?传统日志分析,需要一定的梳理,可不可以直接清洗 / 处理相关的数据?
采购总监 D:
这些年是不是流行数字化,采购这边想预测采购需求,做一下实时分类和管理支出,预测未来供应来源,完善一下成本。这个有办法做吗?还有有些供应商不太稳定啊,能监控到他们的情况吗?
运维总监 E:
网站有时候访问比较慢,没有地方可以看到实时的机器情况,搞个什么监控大屏,这个有办法解决吗?
部门领导 F:
可以实现上面的人的需求吗。
做以上的了解之后,才发现,大家对于数据需求的渴望程度,使用方不仅需要历史的数据,而且还需要实时性的数据。
在电商、金融、制造等行业,数据有着迅猛的增长,诸多的企业面临着的新的挑战,数据分析的实时处理框架,比如说做一些实时数据分析报表、实时数据处理计算等。
和大多数企业类似,在此之前,我们是没有实时计算这方面的经验和积累的。这时,就开始困惑了,怎样可以更好地做上面的需求,在成本和效果之间取得平衡,如何设计相关的架构。

穷则思变,在有了困惑以后,我们就开始准备梳理已有的条件和我们到底需要什么。
首先我们的业务范围主要在区块链浏览器与数据服务、区块链矿池、多币种钱包等。在区块链浏览器的业务里,BTC.com 目前已是全球领先的区块链数据服务平台,矿池业务在业内排行第一,区块链浏览器也是全球前三大浏览器之一。
首先,我们通过 parser 解析区块链上的数据,得到各方面的数据信息,可以分析出每个币种的地址活跃度、地址交易情况、交易流向、参与程度等内容。目前,BTC.com 区块链浏览器与行业内各大矿池和交易所等公司都有相关合作,可以更好地实现一些数据的统计、整理、归纳、输出等。
面向的用户,不仅有专业的区块链开发人员,也有各样的 b 端和 c 端用户,c 端用户可以进行区块链地址的标注,智能合约的运行,查看智能合约相关内容等,以及链上数据的检索和查看。b 端用户则有更专业的支持和指导,提供 API、区块链节点等一些的定制以及交易加速、链上的业务合作、数据定制等。
从数据量级来讲,截至目前,比特币大概有 5 亿笔交易,3000 多万地址,22 亿输出(output:每笔交易的输出),并且还在不断增长中。以太坊的话,则更多。而 BTC.com 的矿池和区块链浏览器都支持多币种,各币种的总数据量级约为几十 T。
矿池是矿工购买矿机设备后连接到的服务平台,矿工可以通过连接矿池从而获取更稳定的收益。这是一个需要保证 7 * 24 小时稳定的服务,里面有矿机不断地提交其计算好的矿池下发的任务的解,矿池将达到网络难度的解进行广播。这个过程也可以认为是近乎是实时的,矿机通过提交到服务器,服务器内部再提交到 kafka 消息队列,同时有一些组件监听这些消息进行消费。而这些提交上来的解可以从中分析出矿机的工作状态、算力、连接情况等。
在业务上,我们需 要进行历史数据和实时数据的计算。
历史数据要关联一些币价,历史交易信息,而这些交易信息需要一直保存,是一种典型的批处理任务。
每当有新区块的确认,就有一些数据可以得到处理和分析,比如某个地址在这个区块里发生了一笔交易,那么可以从其交易流向去分析是什么样的交易,挖掘交易相关性。或者是在这个区块里有一些特殊的交易,比如 segwit 的交易、比如闪电网络的交易,就是有一些这个币种特有的东西可以进行解析分析和统计。并且在新区块确认时的难度预测也有所变化。
还有就是大额交易的监控,通过新区块的确认和未确认交易,锁定一些大额交易,结合地址的一些标注,锁定交易流向,更好地进行数据分析。
还有是一些区块链方面的 OLAP 方面的需求。

总结了在数据统计方面的需求和问题以后,我们就开始进行思考:什么是最合适的架构,如何让人员参与少,成本低。
解决问题,无非就是提出假设,通过度量,然后刷新认知。

在浏览了一些资料以后,我们认为,大部分的计算框架都是通过输入,进行处理,然后得到输出。首先,我们要获取到数据,这里数据可以从 MySQL 也可以从 Kafka,然后进行计算,这里计算可以是聚合,也可以是 TOP 5 类型的,在实时的话,可能还会有窗口类型的。在计算完之后,将结果做下发,下发到消息渠道和存储,发送到微信或者钉钉,落地到 MySQL 等。
团队一开始尝试了 spark,搭建了 yarn,使用了 airflow 作为调度框架,通过做 MySQL 的集成导 入,开发了一些批处理任务,有着离线任务的特点,数据固定,量大,计算周期长,需要做一些复杂操作。
在一些批处理任务上,这种架构是稳定的,但是随着业务的发展,有了越来越多的实时的需求,并且实时的数据并不能保证按顺序到达,按时间戳排序,消息的时间字段是允许前后有差距的。在数据模型上,需求驱动式的开发,成本相对来说,spark 的方式对于当时来说较高,对于状态的处理不是很好,导致影响一部分的效率。
其实在 2019 年的时候,就有在调研一些实时计算的事情,关注到了 Flink 框架,当时还是以 java 为主,整体框架概念上和 spark 不同,认为批处理是一种特殊的流,但是因为团队没有 java 方面的基因和沉淀,使用 Flink 作为实时计算的架构,在当时就暂告一个段落。在 2020 年初的时候,不管是阿里云还是 infoq,还是 b 站,都有在推广 PyFlink,而且当时尤其是程鹤群和孙金城的视频以及孙金城老师的博客的印象深刻。于是就想尝试 PyFlink,其有着流批一体的优势,而且还支持 Python 的一些函数,支持 pandas,甚至以后还可以支持 tensorflow、keras,这对我们的吸引力是巨大的。在之后,就在构思我们的在 PyFlink 上的流批一体的架构。