本文共 5637 字,大约阅读时间需要 18 分钟。
下载地址:
准备三台服务器。
如果是虚拟主机,要分别配置好ip地址和主机名称。 三台主机分别关闭防火墙 ,命令: chkconfig iptables offtar -zxvf kafka_2.11-1.0.0.tgz -C …/servers/
找到配置文件目录
cd /export/servers/kafka_2.11-1.0.0/config 编辑配置文件 vim server.properties配置文件修改如下:
# 注意:每个服务器的 broker.id不一样 ,三台服务器可以分别配置 0、1、2broker.id=0 #处理网络请求的线程数量num.network.threads=3#用来处理磁盘IO的线程数量num.io.threads=8#发送套接字的缓冲区大小socket.send.buffer.bytes=102400#接收套接字的缓冲区大小socket.receive.buffer.bytes=102400#请求套接字的最大缓冲区大小socket.request.max.bytes=104857600#kafka运行日志存放的路径log.dirs=/export/servers/kafka_2.11-1.0.0/logs#topic在当前broker上的分区个数num.partitions=2#用来恢复和清理data下数据的线程数量num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.flush.interval.messages=10000log.flush.interval.ms=1000#segment文件保留的最长时间,超时将被删除log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000#配置连接Zookeeper集群地址zookeeper.connect=node01:2181,node02:2181,node03:2181zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=0#是否允许删除topicdelete.topic.enable=true# 注意:每个服务器的 host.name不一样 host.name=node01
注意,三台服务的配置,有些参数不能一样: 如host.name 和 broker.id
broker.id不能重复。vi /etc/profile
#KAFKA_HOMEexport KAFKA_HOME=/opt/module/kafkaexport PATH=$PATH:$KAFKA_HOME/bin
三台服务器,都完成 kafka的解压、修改配置文件、配置环境变量后。
每台服务器依次启动kafka,输入命令: nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &./kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic test --zookeeper node01:2181,node02:2181,node03:2181
./kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181
./kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
./kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning --topic test
./kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --from-beginning --topic test
org.apache.kafka kafka-clients 1.0.0
public class KafkaProducerStudy { //通过javaAPI操作kafka的生产者,往test这个topic里面生产消息 public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("acks", "all"); //kafka的一个消息确认机制,确保消息的不丢失 props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producerproducer = new KafkaProducer (props); for (int i = 0; i < 100; i++){ producer.send(new ProducerRecord ("test", "hello world"+i)); //ProducerRecord 使用两个形参,第一个形参是我们的topic主题,第二个参数就是我们需要发送的消息 } producer.close(); }}
自动管理offset:
public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerconsumer = new KafkaConsumer (props); consumer.subscribe(Arrays.asList("test")); while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }
kafka的手动管理offset:
public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false");//如果需要手动管理offset,一定要注意,这个配置要给false props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerconsumer = new KafkaConsumer (props); consumer.subscribe(Arrays.asList("test")); final int minBatchSize = 10; List > buffer = new ArrayList >(); while (true) { ConsumerRecords records = consumer.poll(100);//拉取数据 for (ConsumerRecord record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { // insertIntoDb(buffer);实现自己的业务逻辑在这里 consumer.commitSync();//一批次的提交我们的offset buffer.clear(); } }}
简介
kafka-manager是目前最受欢迎的kafka集群管理工具,最早由雅虎开源,用户可以在Web界面执行一些简单的集群管理操作。具体支持以下内容:kafkaManager内容转自()
详细配置、启动操作,请阅读原文。启动后,可以看到kafka运行中的情况:
转载地址:http://homen.baihongyu.com/