`
sharp-fcc
  • 浏览: 105074 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

kafka 设计概论

阅读更多

翻译自:

http://kafka.apache.org/design.html

核心设计:

1. 整体设计(特色)

(1) 默认使用持久化

(2) 优先考虑 吞吐率

(3) 信息的消费状态在 consumer 端记录 而不是 server 端.

(4) kafka 完全是 分布式的, produces  broker  consumer 都认为是分布式的.

2. 解决的问题:

3. 解决核心问题的方式
4. 各种mq区别

kafka 支持 实时信息处理, 同时能够处理一段时间处理一次的离线加载.

5.  设计详情:
基础:一些术语和概念:

信息是交流的基本元素. 信息被 一个 producer 发布到 topic, 从物理机上将,信息被发送到了一台 broker .  一些consumer 订阅了这个topic ,每一条信息被传输到这些consumer.

JMS,Java Message Service,是JavaEE平台最重要的规范之一, 也是企业开发中经常使用到的异步技术。JMS规范目前支持两种消息模型:点对点(point to point, queue)和发布/订阅(publish/subscribe,topic)

kafka 是分布式的, producer , consumer , broker 都可以运行在一些集群上, 从逻辑上讲,是一个group. 这对于 broker 和 producer 是很自然的, 对于consumer 来说需要一些而外的支持.没一个consumer隶属于一个 consumer group 每条信息被传输到每个consumer 中的一个 process 中.然后一个 consumer group 允许多个processes 或者机器表现为一个单独的 consumer .  consumer group 的概念十分强大, 能够支持 JMS中提到的queue 和 topic 的语义. 为了支持 queue 的语义, 我们把所有的consumer 放到一个 单独的 consumer group 中. 为了支持 topic 语义, 没一个消费者放到自己独立的consumer group 中, 这样每个consumer 都可以收到 每一条信息.  一个更通用的方案是在我们的实际应用中, 我们会有很多个逻辑上的 consumer groups, 每一个由不同的机器作为一个逻辑整体.

信息持久化和缓存:不要害怕文件存储系统!

kafka 很大程度上依赖文件系统去存储和缓存信息.一般大众的观点是"磁盘是缓慢的",这使人们怀疑一个支持持久化的系统可以提供完美的性能. 事实上, 磁盘存储用不同的方式会有很大的性能差距;一个合适的磁盘存储的数据结构能像网络传输一样快.

事实上, 近10年来磁盘的性能有了很大的提高. 顺序写的性能在 67200 rpm  SATA RAID-5 上是  300MB /s  , 但是随机读写的性能是 50k/s ,差不多差了10000 倍.   而顺序读写适用于大部分场景, 在 ACM Queue article 中的讨论的结果, 他们发现,顺序的磁盘读写在某些特殊的情况下比随机内存读写更快.

为了补偿磁盘的性能, 现代的操作系统极端的使用主内存作为磁盘缓存.所有的读写操作都会通过这块缓存. 除非使用 direct I/O , 这种特性很难消除, 甚至如果一个process 保存一份数据的缓存, 这些数据很有可能在页面缓存中是双份的, 存储任何东西两份.

而且kafka构建在JVM之上, 任何人知道java 内存应用的必须知道两件事情:

1. 对象的内存开销很大, 经常复制储存的对象.

2. 一旦在堆中的数据增多,java 垃圾回收的代价很高.

因此我们使用文件系统,依赖页面缓存,而不是维护一个内存中的数据的cache 或者其他数据结构: 我们至少可以怀疑, 可用的cache 是否具有原子的对于所有 free memory 的读写能力, 也可以怀疑存储紧凑的自己码就比单独的对象好. 这样做的话在一个32GB的机器上, cache 将会飙升到 28- 30 GB,如果没有 GC的话.而且如果一个服务 重启, 假设这些 cache 没有丢失, 服务加载这些数据(10GB得花 10分钟), 如果缓存丢失, 则将花费更多时间,性能很糟糕. 在现代的操作系统中, 对于 cache 和文件系统的关联逻辑已经很简单, 这导致服务重启加载数据更加高效很正确. 如果你的磁盘顺序读性能很好,可以在cache 中预加载一些有用的数据.

我们的设计十分简单: 比起以往在内存中维护尽量多的数据, 在必要的时候持久化到磁盘上, 我们反过来了. 所有的数据会马上写到磁盘上, 从不 flush 数据. 事实上这些被传输到 内核的 页面缓存中, 操作系统会 执行flush 操作. 我们加了一个配置文件允许系统所有者控制 flush 策略. (每几条数据或者 每几秒钟) .这样一定量的数据有可能会丢失, 在硬件崩溃的时候.

时间复杂度

用磁盘存储这种数据结构通常是 BTree. BTree 是一种万能的数据结构, 能够支持消息系统中事务性和非事务性的语义. 这样做的代价很高, Btree的操作是 O(log N). 复杂度的. 一般O(log N) 被认为是线性复杂度的,但是这在磁盘操作中是不成立的. 磁盘寻址 10ms一次, 每一次磁盘操作只能查找一样东西, 并行操作是不被允许的. 少量的磁盘寻址会导致很大的开销.  不过现在的存储系统会混合 cache 操作和真实的磁盘物理操作, btree 在这种数据结构常常是超线性的. Btree 需要很复杂的行锁来避免锁住整科树. 大量依赖磁盘的代价是很高的.

直观的来讲,一个持久化的队列建立在简单的读和顺序写,就像一般打log 一样. 尽管这种简单的方式不像btree 一样支持丰富的语义. 但是他有很多好处, 比如他所有的操作都是O(1) 的, 读操作不会阻塞写操作. 这样性能跟数据大小是可以解偶的. 一台server 可以充分利用许多 便宜, 低转速的 1+TB SATA 硬盘. 尽管他们的随机读性能很差,  在大量读和写的情况下, 节省2/3 的费用, 和 3倍的容量.

有了这种理论上接近无限大的磁盘空间,我们可以提供一些通常消息系统没有的特性. 比如说.在kafka中, 我们能够维持数据一段时间而不是在消费后马上删除消息.

最大化效率

我们假设信息的量很大, 同时我们假设一条信息只被消费一次, 实际上有可能被多次消费, 我们会着重于消费的设计相比与生产而言.

有两种很不高效的情况: 很多网络请求, 过度的字节拷贝.

为了做到高效, API 的一个核心概念是 'message set' 这样一种抽象概念来组织信息. 这样允许网络请求来把信息组织到一起, 分摊网络往返的开销, 而不是一次发送一条单个数据. 

message set 自己本身是一个瘦API , 只来包装字节数组或者文件. 在数据传输的时候并没有单独的序列化和反序列化的步骤, 信息在需要用到的时候反序列化.

在一个broker 中维持的 信息的log 只是 message sets 的一个目录. 这种格式的抽象会被broker 和 consumer 来分享.

维护这种格式允许优化一些最重要的操作: 持久化 log块的网络传输. 现代unix 操作系统提供优化的代码用于传输数据到 socket ; 在linux 中适用, sendfile 系统调用. java 提供一个 对于 filechannel.transferTo 的系统调用. 

为了理解这种sendfile 的影响, 我们有必要理解一般情况下数据是如何从 文件传输到 socket 的:

1.  操作系统从磁盘上读取数据到内核的页面缓存中.

2.  程序从内核空间中读取数据到 用户空间的buffer 中. 

3.  程序把数据写回内核空间的 socket buffer 中.

4. 操作系统将 socket buffer 的数据拷贝到 NIC buffer 中进行网络传输.

这很显然不是高效的, 一共有4次拷贝, 两次系统调用. 而用 sendfile ,重复的copy 可以被避免, 操作系统直接把数据从页面缓存到网络. 用这种优化的路径下,只有一次到NIC buffer 的拷贝.

我们假设一个通用的情况是一个topic 有很多消费者. 用这种优化的操作, 数据被一次拷贝到页面缓存,在每次消费的时候被重用而不是在存储在内存中,当被读到的时候在被读到的时候.这样就允许信息被消费的瓶颈在网络传输.

点对点批量压缩

在许多情况下, 瓶颈在于网络传输而不是CPU .尤其是这种数据流转得通过数据中心的情况. 当然用户能够传输被压缩的数据, 不需要kafka 的支持. 但是这样会无疑u压缩率很低, 在大量数据重复的情况下.有效的压缩是把很多信息压缩在一起, 而不是单条信息单独压缩. 理想情况下, 压缩是点对点的. 数据在producer 端传输之前就压缩了. 在server 上保持压缩的状态. 在consumer 端解压.

kafka 支持message sets 的循环嵌套.  一群message 能够被批量的压缩, 在达到consumer 的时候解压.

consumer 状态

追踪什么被消费了是一个消息系统需要解决的核心的问题.这不太直观, 但是记录这个状态会很大程度上影响整个系统的性能. 状态追踪需要更新持久化信息, 会到导致随机读写, 这是无法忍受的.

大多数消息系统在broker 上用元数据记录哪些信息是被消费的. 就是说, 当一个信息被发送到consumer 那里, broker 在本地存储这些信息的内容.这是一个不明智的做法, 这中情况在一个单机的server上适用.  这种方式在许多信息系统协同工作的时候表现很糟糕, 这种方式是比较老实的, broker 知道哪些被消费了, 哪些没有被消费, 被消费了的可以立马删除这条信息, 保证数据量小.

让broker 和 consumer 达成一致,什么信息被消费了不是一个很难的问题. 如果broker 马上标识一条信息为已消费,然后把信息传出去.但是consumer 由于网络原因或者什么的没有收到这条信息, 那么这条信息就会丢失.为了解决这个问题,许多消息系统家了一个特性标识这条信息已经被发送出去,而不是 已消费. broker 等待consumer 确认这条信息已经被消费. 这种策略解决消息丢失的问题.但是会出现一种新的问题. 首先,如果consumer 在消费完成之前没有发送已消费信息, 这样信息会被消费两次. 第二个问题是性能问题, 现在 broker 需要为一条信息维持好几种状态,(首先在收到回复前得锁住这条信息, 然后标记他为永久消费,可以删除). 许多问题必须被处理, 比如说那些标识没有被发送,但是没有被确认的信息怎么办.

信息传输语义

有几种可能的信息传输的状态:

   最多消费一次----- 信息一旦被标记已经消费, 他们不会再往外发,问题是失败的时候会导致信息丢失.

   至少消费一次----- 我们保证信息会被发送出去,如果有失败,信息可能会被消费两次.

   准确一次------一次且只有一次.

kafka 做了跟用 metadata 不一样的两件事情. 首先, 这些流被分在了 brokers 上的一个独立的分区中. 这种语义表明这些分区留给producer 用, producer 决定这条信息属于哪个分区. 在一个分区中,信息被按照他们抵达的顺序做顺序存储,在发给consumer 的时候按照这样的顺序.  这表明, 比起需要为了每一条信息存储 源数据, 我们只需要存储'最高的水位' 给每一个consumer ,topic , partition 的组合. 要存储的 源信息会变得很小. 在kafka  中, 我们叫最高的水位叫 'offset ' . 

消费者状态

在kafka 中, consumer 负责维护状态信息(offset) 存储消费到哪里了.  事实上, kafka 的 consumer 库里将他们消费的状态数据存储到了 kafka 中. 然而, 也许把状态数据写到一个数据存储中心会更好.举个例子, consumer 只是把一些聚合的信息到一个集中事务性 OLTP 数据库. 在这种情况下, consumer 到数据库里看下就知道哪些信息被消费过.  这样解决了分区一致性问题. 在一些非事务性的系统里也是一样的.一个搜索系统可以存储 consumer 状态用他自己的索引段. 虽然这样没有持久化的保证, 但是这个索引会一直跟消费者状态同步: 如果一个没有被 flush 的索引段被丢失了,  索引还可以从最后消费的地方重新开始. 就像我们的 Hadoop 加载任务, 从kafka 并行加载数据, 用了类似的技巧. 一个特定的mapper  在 HDFS 写下消费的 offset 在任务的最后. 如果任务失败, 重启, 只要拿到在任务最后 offset 继续加载.

这样有个额外的好处. 一个consumer能够故意的回到一个老的offset 去重新消费数据.  这违反了一般队列的原则, 但确实现在一般consumer 的刚需. 举个例子, 如果一个consumer 有bug, 按照错误的方式消费数据, 在修正这个bug 之后, consumer能够重新消费这些数据.

推 vs 拉

一个相关的问题是consumers 是否应该从brokers 拉数据还是 brokers应该主动向consumer 推送数据. 在这个方面, kafka 遵循了传统的做法, 数据从producer 推给 broker , consumer 从 broker 拉数据. 一些类似的系统, 比如scribe flume, 关注于 log 聚合, 采用推的方式.  这样各有利弊. 一个推的系统在面对不同的consumer 的时候, broker 的控制推送数据的速率. 不要忘了,我们的目标是让consumer 尽量快的消费数据. 在一个推为基础的系统中,只能在生产的速度之下. 一个拉为基础的系统在consumer 挂了,然后他会尽力赶上现在的进度. 这样能够减轻consumer 的压力, 如果consumer 扛不住, 他自己可以调整速率,充分利用资源. 综上所述, 我们使用了 拉的模型

分布式

kafka 被设计成 在一群机器上运行的系统. 没有master 这么一个概念.  brokers 是公平的. 能够不用任何配置,动态增加, 删除. 同样的, producers 和 consumers 能够在任何时候被动态的开始. 每一个 broker 在zookeeper 中注册一些信息. 用zookeeper 注册topics 写作 producers 和 consumers . 

Producers

自动producer 负载均衡

kafka 支持 客户端的负载均衡, 用 TCP 连接做负载均衡. 在brokers 中 , 一个专用的 第四层负载均衡器专门均衡 TCP 链接. 在这种配置中,一个producer 中出去的数据会放到一个broker 中. 优点是producer 和 broker 交互只需要一个独立的TCP 链接, 并需要跟zookeeper 有交互. 这样不好的地方是, 这并不能很好的均衡, 如果一个链接传输的数据远远多于其他的.

客户端 基于zookeeper 的负载均衡解决了这些问题. 他允许producer 动态的发现一些新的brokers , 对每一个请求做负载均衡. 他允许producer 根据一些key 去给数据分区. 

基于zookeeper 的负载均衡详情如下. zookeeper 上注册了如下事件:

   一个新的broker 产生

   一个broker down掉

   一个新的topic 被注册

   一个broker 在一个已经存在的topic 上重新注册

最终, producer 维护一个连接池, 跟broker 连着. 这个池被一直保持着到那些活跃的brokers, 直到 zookeeper 事件回调. 当一个producer 的请求过来, 会对应一个broker 分区. 

异步发送

异步的非阻塞的操作是信息系统的基础. 在kafka 中, producer 提供一个选项是否选用异步的方式发送数据.这允许缓存一些请求在内存中, 在一定的时间内,或者达到一定的数量. 由于producer 产生的数据速率不一样, 这个异步的操作会帮助产生统一的到brokers 的信号量, 为了更好的网络应用和更高的吞吐. 

分区语义

假设一个应用需要为没一个人维护一个来访者的信息. 这将会发送所有的来访事件到一个特殊的分区中, 让同一个consumer 去消费这些信息. producer 有这个能力去map 信息到指定的kafka 节点和分区. 这样允许用用一个key 去在broker 上区分他们. 这个功能能够通过实现 kafka.producer.Partitioner 接口来实现, 默认是个随机的. 在上面的例子中, 我们可以用 hash(member_id)%num_partitions 来分区.

支持Hadoop 和其他批量数据加载

可伸缩的持久化允许支持批量加载数据到离线系统的能力. 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics