Kafka-入门及简单示例-CSDN博客

  • 阿里云国际版折扣https://www.yundadi.com

  • 阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

    启动与简单示例

    # 命令行1
    #开启Zookeeper
    E:\>cd E:\kafka_2.13-3.6.0
    
    E:\kafka_2.13-3.6.0>bin\windows\zookeeper-server-start.bat config\zookeeper.properties
    # 命令行2
    #开启Kafka
    E:\>cd E:\kafka_2.13-3.6.0
    E:\kafka_2.13-3.6.0>bin\windows\kafka-server-start.bat config\server.properties
    # 命令行3
    #创建主题
    E:\kafka_2.13-3.6.0\bin\windows>kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    #往主题发送消息 生产者命令
    E:\kafka_2.13-3.6.0\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic test
    >hello
    >woe
    #查看发送的消息 消费者命令
    #命令行4
    E:\kafka_2.13-3.6.0\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
    

    结果
    在这里插入图片描述

    Spring整合Kafka–简单示例

    在这里插入图片描述

    pom.xml

    		<dependency>
    			<groupId>org.springframework.kafka</groupId>
    			<artifactId>spring-kafka</artifactId>
    		</dependency>
    

    application.properties

    #Kafka
    spring.kafka.bootstrap-servers=localhost:9092
    spring.kafka.consumer.group-id=community-consumer-group
    spring.kafka.consumer.enable-auto-commit=true
    spring.kafka.consumer.auto-commit-interval=3000
    

    一个生产者 一个消费者
    生产者在某些事件时触发消息产生
    消费者根据topic监听事件 如果有生产者产生消息 自动进行消费

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    @ContextConfiguration(classes = CommunityApplication.class)
    public class KafkaTest {
        @Autowired
        private KafkaProducer kafkaProducer;
        @Test
        public void testKafka(){
            kafkaProducer.sendMessage("test","你好啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊");
            kafkaProducer.sendMessage("test","在吗啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊");
            try {
                Thread.sleep(1000*10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    //生产者
    @Component
    class KafkaProducer{
        @Autowired
        private KafkaTemplate kafkaTemplate;
        public void sendMessage(String topic,String content){
            kafkaTemplate.send(topic,content);
        }
    }
    //消费者
    @Component
    class KafkaConsumer{
        @KafkaListener(topics = {"test"})
        public void handleMessage(ConsumerRecord record){
            System.out.println(record.value());
        }
    }
    
    
  • 阿里云国际版折扣https://www.yundadi.com

  • 阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6