6.1 分布式 id 生成器

有时我们需要能够生成类似 MySQL 自增 ID 这样不断增大,同时又不会重复的 id。以支持业务中的高并发场景。比较典型的,电商促销时,短时间内会有大量的订单涌入到系统,比如每秒 10w+。明星出轨时,会有大量热情的粉丝发微博以表心意,同样会在短时间内产生大量的消息。

在插入数据库之前,我们需要给这些消息/订单先打上一个 ID,然后再插入到我们的数据库。对这个 id 的要求是希望其中能带有一些时间信息,这样即使我们后端的系统对消息进行了分库分表,也能够以时间顺序对这些消息进行排序。

Twitter 的 snowflake 算法是这种场景下的一个典型解法。先来看看 snowflake 是怎么一回事:

  1. datacenter_id sequence_id
  2. unused
  3. │◀────────────────── 41 bits ────────────────────▶│
  4. ┌─────┼──────────────────────────────────────────────────────┼────────┬────────┬────────────────┐
  5. 0 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0 00000 00000 0000 0000 0000
  6. └─────┴──────────────────────────────────────────────────────┴────────┴────────┴────────────────┘
  7. time in milliseconds worker_id

首先确定我们的数值是 64 位,int64 类型,被划分为四部分,不含开头的第一个 bit,因为这个 bit 是符号位。用 41 位来表示收到请求时的时间戳,单位为毫秒,然后五位来表示数据中心的 id,然后再五位来表示机器的实例 id,最后是 12 位的循环自增 id(到达 1111 1111 1111 后会归 0)。

这样的机制可以支持我们在同一台机器上,同一毫秒内产生 2 ^ 12 = 4096 条消息。一秒共 409.6w 条消息。从值域上来讲完全够用了。

数据中心 + 实例 id 共有 10 位,可以支持我们每数据中心部署 32 台机器,所有数据中心共 1024 台实例。

表示 timestamp 的 41 位,可以支持我们使用 69 年。当然,我们的时间毫秒计数不会真的从 1970 年开始记,那样我们的系统跑到 2039/9/7 23:47:35 就不能用了,所以这里的 timestamp 实际上只是相对于某个时间的增量,比如我们的系统上线是 2018-08-01,那么我们可以把这个 timestamp 当作是从 2018-08-01 00:00:00.000 的偏移量。

6.1.1 worker id 分配

timestamp,datacenter_id,worker_id 和 sequence_id 这四个字段中,timestamp 和 sequence_id 是由程序在运行期生成的。但 datacenter_id 和 worker_id 需要我们在部署阶段就能够获取得到,并且一旦程序启动之后,就是不可更改的了(想想,如果可以随意更改,可能被不慎修改,造成最终生成的 id 有冲突)。

一般不同数据中心的机器,会提供对应的获取数据中心 id 的 api,所以 datacenter_id 我们可以在部署阶段轻松地获取到。而 worker_id 是我们逻辑上给机器分配的一个 id,这个要怎么办呢?比较简单的想法是由能够提供这种自增 id 功能的工具来支持,比如 MySQL:

  1. mysql> insert into a (ip) values("10.1.2.101");
  2. Query OK, 1 row affected (0.00 sec)
  3. mysql> select last_insert_id();
  4. +------------------+
  5. | last_insert_id() |
  6. +------------------+
  7. | 2 |
  8. +------------------+
  9. 1 row in set (0.00 sec)

从 MySQL 中获取到 worker_id 之后,就把这个 worker_id 直接持久化到本地,以避免每次上线时都需要获取新的 worker_id。让单实例的 worker_id 可以始终保持不变。

当然,使用 MySQL 相当于给我们简单的 id 生成服务增加了一个外部依赖。依赖越多,我们的服务的可运维性就越差。

考虑到集群中即使有单个 id 生成服务的实例挂了,也就是损失一段时间的一部分 id,所以我们也可以更简单暴力一些,把 worker_id 直接写在 worker 的配置中,上线时,由部署脚本完成 worker_id 字段替换。

6.1.2 开源实例

6.1.2.1 标准 snowflake 实现

github.com/bwmarrin/snowflake 是一个相当轻量化的 snowflake 的 Go 实现。其文档指出:

  1. +--------------------------------------------------------------------------+
  2. | 1 Bit Unused | 41 Bit Timestamp | 10 Bit NodeID | 12 Bit Sequence ID |
  3. +--------------------------------------------------------------------------+

和标准的 snowflake 完全一致。使用上比较简单:

  1. package main
  2. import (
  3. "fmt"
  4. "os"
  5. "github.com/bwmarrin/snowflake"
  6. )
  7. func main() {
  8. n, err := snowflake.NewNode(1)
  9. if err != nil {
  10. println(err)
  11. os.Exit(1)
  12. }
  13. for i := 0; i < 3; i++ {
  14. id := n.Generate()
  15. fmt.Println("id", id)
  16. fmt.Println("node: ", id.Node(), "step: ", id.Step(), "time: ", id.Time(), "\n")
  17. }
  18. }

当然,这个库也给我们留好了定制的后路:

  1. // Epoch is set to the twitter snowflake epoch of Nov 04 2010 01:42:54 UTC
  2. // You may customize this to set a different epoch for your application.
  3. Epoch int64 = 1288834974657
  4. // Number of bits to use for Node
  5. // Remember, you have a total 22 bits to share between Node/Step
  6. NodeBits uint8 = 10
  7. // Number of bits to use for Step
  8. // Remember, you have a total 22 bits to share between Node/Step
  9. StepBits uint8 = 12

Epoch 就是本节开头讲的起始时间,NodeBits 指的是机器编号的位长,StepBits 指的是自增序列的位长。

6.1.2.2 sonyflake

sonyflake 是 Sony 公司的一个开源项目,基本思路和 snowflake 差不多,不过位分配上稍有不同:

  1. +-----------------------------------------------------------------------------+
  2. | 1 Bit Unused | 39 Bit Timestamp | 8 Bit Sequence ID | 16 Bit Machine ID |
  3. +-----------------------------------------------------------------------------+

这里的时间只用了 39 个 bit,但时间的单位变成了 10ms,所以理论上比 41 位表示的时间还要久(174 years)。

Sequence ID 和之前的定义一致,Machine ID 其实就是节点 id。sonyflake 与众不同的地方在于其在启动阶段的 setting 配置:

  1. func NewSonyflake(st Settings) *Sonyflake

Settings 数据结构如下:

  1. type Settings struct {
  2. StartTime time.Time
  3. MachineID func() (uint16, error)
  4. CheckMachineID func(uint16) bool
  5. }

StartTime 选项和我们之前的 Epoch 差不多,如果不设置的话,默认是从 2014-09-01 00:00:00 +0000 UTC 开始。

MachineID 可以由用户自定义的函数,如果用户不定义的话,会默认将本机 ip 的低 16 位作为 machine id。

CheckMachineID 是由用户提供的检查 MachineID 是否冲突的函数。这里的设计还是比较巧妙的,如果有另外的中心化存储并支持检查重复的存储,那我们就可以按照自己的想法随意定制这个检查 MachineID 是否冲突的逻辑。如果公司有现成的 Redis 集群,那么我们可以很轻松地用 Redis 的 set 来检查冲突。

  1. redis 127.0.0.1:6379> SADD base64_encoding_of_last16bits MzI0Mgo=
  2. (integer) 1
  3. redis 127.0.0.1:6379> SADD base64_encoding_of_last16bits MzI0Mgo=
  4. (integer) 0

使用起来也比较简单,有一些逻辑简单的函数就略去实现了:

  1. package main
  2. import (
  3. "fmt"
  4. "os"
  5. "time"
  6. "github.com/sony/sonyflake"
  7. )
  8. func getMachineID() (uint16, error) {
  9. var machineID uint16
  10. var err error
  11. machineID = readMachineIDFromLocalFile()
  12. if machineID == 0 {
  13. machineID, err = generateMachineID()
  14. if err != nil {
  15. return 0, err
  16. }
  17. }
  18. return machineID, nil
  19. }
  20. func checkMachineID(machineID uint16) bool {
  21. saddResult, err := saddMachineIDToRedisSet()
  22. if err != nil || saddResult == 0 {
  23. return true
  24. }
  25. err := saveMachineIDToLocalFile(machineID)
  26. if err != nil {
  27. return true
  28. }
  29. return false
  30. }
  31. func main() {
  32. t, _ := time.Parse("2006-01-02", "2018-01-01")
  33. settings := sonyflake.Settings{
  34. StartTime: t,
  35. MachineID: getMachineID,
  36. CheckMachineID: checkMachineID,
  37. }
  38. sf := sonyflake.NewSonyflake(settings)
  39. id, err := sf.NextID()
  40. if err != nil {
  41. fmt.Println(err)
  42. os.Exit(1)
  43. }
  44. fmt.Println(id)
  45. }