这一节我们看下怎么创建一个多实例的集群(以三个节点为例)。
一、 创建配置文件
进入到 Kafka 主目录,以 config/server.properties 为原型,创建两个新的节点配置文件
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
按如下方式修改这两个配置文件的相关属性
config/server-1.properties:
listeners=PLAINTEXT://:9093
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
listeners=PLAINTEXT://:9094
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
上述属性中 broker.id 在集群中必须是唯一且永久的。我们打算在同一台机器上启动另两个节点实例,为避免冲突,端口和日志文件路径也做了相应的修改。
二、启动服务
输入如下命令启动服务:
bin/kafka-server-start.sh config/server.properties &
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
jps 一下,一共可以看到三个 Kafka 进程。
三、 创建主题
现在我们创建一个具有三份副本、两个划分的主题:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic2
我们可以通过 describe 参数查看集群中对刚刚创建的主题的配置情况:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic2
Topic:my-replicated-topic2 PartitionCount:2 ReplicationFactor:3 Configs:
Topic: my-replicated-topic2 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: my-replicated-topic2 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
这里解释一下上述输出:第一行给出了所有划分的摘要信息,接下来逐行显示每个划分的详细信息。
- leader 节点响应对应划分的全部读写请求。
- replicas 是一个节点列表,他们产生相应划分的副本信息,里面的节点不一定都是存活的。
- isr 是一组非同步(in-sync)的备份,它是 replicas 的子集,里面的节点当前都是存活的。
四、 验证可靠性
接下来我们验证一下多副本的可靠性。先灌两条消息到之前创建的主题上:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic2
test message 1
test message 2
读一下看看:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic2
test message 1
test message 2
(注意因为我们申请了两个划分,上述命令的结果有可能顺序不一致,也就是 test message 2 在前面)
我们通过这个命令找到 id 为 1 的节点实例:
ps -ef | grep server-1.properties
work 15053 47875 0 17:25 pts/1
干掉这个节点进程:
这时候我们再看一下主题的状态描述:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic2
Topic:my-replicated-topic2 PartitionCount:2 ReplicationFactor:3 Configs:
Topic: my-replicated-topic2 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,2
Topic: my-replicated-topic2 Partition: 1 Leader: 2 Replicas: 1,2,0 Isr: 2,0
可以看出来节点 1 已经被停掉了。我们再去主题中读一下消息:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic eplicated-topic2
test message 2
test message 1
谢天谢地,数据还在。
相关推荐
开发使用,特此分享,所...Kafka Eagle监控系统是一款用来监控Kafka集群的工具,支持管理多个Kafka集群、管理Kafka主题(包含查看、删除、创建等)、消费者组合消费者实例监控、消息阻塞告警、Kafka集群健康状态查看等
今天小编就为大家分享一篇Python测试Kafka集群(pykafka)实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
kafka集群测试,生产、消费实例代码,kafka-demo.rar
您将需要在集群中添加实例, 确保将其部署在与openFaaS相同的名称空间中,这将使用为OpenFaaS网关上的基本身份验证创建的机密 如果kafka和redis需要安全连接,请在openFaas命名空间中创建这些秘密 kubectl create ...
自动扩展 (AWS) 集群以处理来自 kafka 集群的主题: 工作项目...... 主要职责: 验证新主题 注册新工人(节点) 向工作人员询问负载、内存使用情况和网络 使用节点信息决定向何处发送新作业 工人主管 保存kafka...
Kafka REST代理 Kafka REST代理为Kafka集群提供了RESTful接口。 它使生成和使用消息,查看群集状态以及执行管理操作变...以下内容假设您已经使用默认设置和一些已创建的主题运行Kafka和REST Proxy实例。 # Get a list
Kafka Streams将数据存储在Kafka集群(Kafka State Stores)中,并快速获取数据检票口。 该存储库通过Kafka Streaming演示了事件源(材料化视图)( ) 在典型的生产环境中,我们有多个带有专用数据库的微服务,...
kafka版本:kafka_2.11-0.9.0.1 kafka jar包版本:0.9.0.1 ...kafka集群:192.168.1.101,192.168.1.102,192.168.1.103 partition分发策略主要是自定义Partitioner的实现类,通过根据key和分区数量来实现
kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper...
对于集群中的每个Kafka实例 KAFKA_CLIENT_<i>_SERVICE_HOST :etcd实例的IP地址 KAFKA_CLIENT_<i>_SERVICE_PORT :etcd客户端服务的端口号(4001) 对于每个Zookeeper实例 ZK_CLIENT_<i>_SERVICE_HOST :Zookeeper...
《Apache Kafka实战》是涵盖Apache Kafka各方面的具有实践指导意义的工具书和参考书。...第7~9章以实例的方式讲解了Kafka集群的管理、监控与调优;第10章介绍了Kafka新引入的流式处理组件。 《Apache Kafk...
杂乱的集群 该Scrapy项目使用Redis和Kafka创建按需分布式抓取集群。 目标是在许多等待的蜘蛛实例之间分发种子URL,这些蜘蛛实例的请求通过Redis进行协调。 由于边界扩展或深度遍历而导致的任何其他爬网也会在群集中...
一组简单的项目,通过使用我们的来说明往返于Kafka集群和Diffusion Cloud实例的外汇(fx)数据流的生产和使用。 这些JavaScript代码示例将帮助您从前端应用程序实时将fx数据发布到Kafka集群,从中进行消费并通过...
kafka模拟生产者、消费者,集群模式,若是单机版,将ip端口组改为相应ip端口即可;
该存储库包含使用Prometheus设置监视运行中的Apache Kafka集群以及可用于测试监视设置的虚拟客户端(生产者,使用者和kafka流)所必需的建筑群和配置。 建筑模块 Graphana 目录包含设置dockerized prometheus实例...
zookeeper,以使用和生成本地计算机上的消息要求您需要python3才能使用kafkatunnel.py python3 点3 注意:对于AWS模式:您需要使用Name = kafka / zookeeper标记ec2 zookeeper和kafka实例参见安装$ git clone ...
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者...Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。文档介绍有关Kafka的简单安装和使用。
由易到难,包括以下这几本书:kafka的初认识-v3,Kafka 核心源码剖析-v1,Kafka 核心源码剖析-v2,Kafka 基础实战 :消费者和生产者实例-v3,kafka 消息处理过程与集群维护-v2,
在kafka集群中,每个代理节点(Broker)在启动都会实例化一个KafkaController类。该类会执行一系列业务逻辑,选举出主题分区的leader节点。 (1)第一个启动的代理节点,会在Zookeeper系统里面创建一个临时节点/...
分区:如果主题很大,kafka将主题划分为不同的集群,每个集群存储在系统中,我们决定何时进行分区以及应该进行多少分区 偏移量:消息到达分区的序列号,偏移量在分区本地 消费者组:作为单个单元的消费者组,可以...