博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka 实战
阅读量:3904 次
发布时间:2019-05-23

本文共 5637 字,大约阅读时间需要 18 分钟。

Kafka 实战

一、kafka集群的安装与搭建

1.1 kafka下载

下载地址:

1.2 服务器准备

准备三台服务器。

如果是虚拟主机,要分别配置好ip地址和主机名称。
三台主机分别关闭防火墙 ,命令: chkconfig iptables off

1.3 解压安装包

tar -zxvf kafka_2.11-1.0.0.tgz -C …/servers/

1.4 修改配置文件

找到配置文件目录

cd /export/servers/kafka_2.11-1.0.0/config
编辑配置文件
vim server.properties

配置文件修改如下:

# 注意:每个服务器的 broker.id不一样 ,三台服务器可以分别配置 0、1、2broker.id=0    #处理网络请求的线程数量num.network.threads=3#用来处理磁盘IO的线程数量num.io.threads=8#发送套接字的缓冲区大小socket.send.buffer.bytes=102400#接收套接字的缓冲区大小socket.receive.buffer.bytes=102400#请求套接字的最大缓冲区大小socket.request.max.bytes=104857600#kafka运行日志存放的路径log.dirs=/export/servers/kafka_2.11-1.0.0/logs#topic在当前broker上的分区个数num.partitions=2#用来恢复和清理data下数据的线程数量num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.flush.interval.messages=10000log.flush.interval.ms=1000#segment文件保留的最长时间,超时将被删除log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000#配置连接Zookeeper集群地址zookeeper.connect=node01:2181,node02:2181,node03:2181zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=0#是否允许删除topicdelete.topic.enable=true# 注意:每个服务器的 host.name不一样 host.name=node01

注意,三台服务的配置,有些参数不能一样: 如host.name 和 broker.id

broker.id不能重复。

1.5配置环境变量

vi /etc/profile

#KAFKA_HOMEexport KAFKA_HOME=/opt/module/kafkaexport PATH=$PATH:$KAFKA_HOME/bin

1.6启动集群

三台服务器,都完成 kafka的解压、修改配置文件、配置环境变量后。

每台服务器依次启动kafka,输入命令:
nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

1.7kafka的命令行的使用

创建topic

./kafka-topics.sh  --create --partitions 3 --replication-factor 2 --topic test --zookeeper node01:2181,node02:2181,node03:2181

查看所有的topic

./kafka-topics.sh  --list --zookeeper node01:2181,node02:2181,node03:2181

kafka的消息发送

./kafka-console-producer.sh  --broker-list node01:9092,node02:9092,node03:9092   --topic test

kafka消息的消费

./kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning  --topic test

使用zk来连接集群

./kafka-console-consumer.sh --zookeeper  node01:2181,node02:2181,node03:2181 --from-beginning  --topic test

二、kafka的API使用

第一步:创建maven工程,导入jar包

org.apache.kafka
kafka-clients
1.0.0

第二步:kafka的生产者API

public class KafkaProducerStudy {
//通过javaAPI操作kafka的生产者,往test这个topic里面生产消息 public static void main(String[] args) {
Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("acks", "all"); //kafka的一个消息确认机制,确保消息的不丢失 props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer
producer = new KafkaProducer
(props); for (int i = 0; i < 100; i++){
producer.send(new ProducerRecord
("test", "hello world"+i)); //ProducerRecord 使用两个形参,第一个形参是我们的topic主题,第二个参数就是我们需要发送的消息 } producer.close(); }}

第三步:kafka的消费者的API

自动管理offset:

public static void main(String[] args) {
Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer
consumer = new KafkaConsumer
(props); consumer.subscribe(Arrays.asList("test")); while (true) {
ConsumerRecords
records = consumer.poll(100); for (ConsumerRecord
record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }

kafka的手动管理offset:

public static void main(String[] args) {
Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false");//如果需要手动管理offset,一定要注意,这个配置要给false props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer
consumer = new KafkaConsumer
(props); consumer.subscribe(Arrays.asList("test")); final int minBatchSize = 10; List
> buffer = new ArrayList
>(); while (true) { ConsumerRecords
records = consumer.poll(100);//拉取数据 for (ConsumerRecord
record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { // insertIntoDb(buffer);实现自己的业务逻辑在这里 consumer.commitSync();//一批次的提交我们的offset buffer.clear(); } }}

三、kafkaManager管理工具的使用

简介

kafka-manager是目前最受欢迎的kafka集群管理工具,最早由雅虎开源,用户可以在Web界面执行一些简单的集群管理操作。具体支持以下内容:

  • 管理多个集群
  • 轻松检查群集状态(主题,消费者,偏移,代理,副本分发,分区分发)
  • 运行首选副本选举
  • 使用选项生成分区分配以选择要使用的代理
  • 运行分区重新分配(基于生成的分配)
  • 使用可选主题配置创建主题(0.8.1.1具有与0.8.2+不同的配置)
  • 删除主题(仅支持0.8.2+并记住在代理配​​置中设置delete.topic.enable = true)
  • 主题列表现在指示标记为删除的主题(仅支持0.8.2+)
  • 批量生成多个主题的分区分配,并可选择要使用的代理
  • 批量运行重新分配多个主题的分区
  • 将分区添加到现有主题
  • 更新现有主题的配置
    kafka-manager 项目地址:

kafkaManager内容转自()

详细配置、启动操作,请阅读原文。

启动后,可以看到kafka运行中的情况:

在这里插入图片描述

转载地址:http://homen.baihongyu.com/

你可能感兴趣的文章
怎样做研究
查看>>
labview 局部变量问题
查看>>
labview 循环外部与数组相连时问题
查看>>
哈佛大学凌晨4点半的景象--哈佛图书馆的二十条训言
查看>>
Outlook2010到处通讯录
查看>>
Gmail导入通讯录
查看>>
小米笔记本安装Win 10历程
查看>>
【转】SLAM 论文阅读和分类整理
查看>>
【转】Ubuntu 16.04 重置密码(忘记密码)
查看>>
【转】信息奥赛一本通1185:单词排序(OJ题目描述有问题)
查看>>
webclient
查看>>
从百度MP3搜索结果中提取歌曲列表
查看>>
Python Set
查看>>
SWT 中实现最小化到托盘图标,并只能通过托盘的弹出菜单关闭程序
查看>>
Java Table Examples
查看>>
Java read file
查看>>
界面主线程,子线程更新主界面控件
查看>>
敲两遍引号键才出现
查看>>
欢迎使用CSDN-markdown编辑器
查看>>
剑指Offer
查看>>