仿牛客社区项目(第五章)(上)
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
文章目录
第三章Kafka构建TB级异步消息系统
一、阻塞队列
-
BlockingQueue
- 解决线程通信的问题。
- 阻塞方法
put
、take
。
-
生产者消费者模式
- 生产者产生数据的线程。
- 消费者使用数据的线程。
-
实现类
ArrayBlockingQueue
LinkedBlockingQueue
PriorityBlockingQueue、SynchronousQueue、DelayQueue
等。
1. 阻塞队列测试方法
在 test
中添加 BlockingQueueTests
类来表示阻塞队列的测试方法代码如下
public class BlockingQueueTests {
public static void main(String[] args) {
BlockingQueue queue = new ArrayBlockingQueue(10);
new Thread(new Producer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
}
}
class Producer implements Runnable{
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run(){
try{
for(int i = 0; i < 20; i ++ ) {
Thread.sleep(20);
queue.put(i);
System.out.println(Thread.currentThread().getName()+"生产"+ queue.size());
}
}catch (Exception e){
e.printStackTrace();
}
}
}
class Consumer implements Runnable{
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true){
Thread.sleep(new Random().nextInt(1000));
queue.take();
System.out.println(Thread.currentThread().getName()+"消费"+ queue.size());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2. 测试结果
Thread-0生产1
Thread-0生产2
Thread-0生产3
Thread-0生产4
Thread-0生产5
Thread-0生产6
Thread-0生产7
Thread-0生产8
Thread-0生产9
Thread-0生产10
Thread-3消费9
Thread-0生产10
Thread-1消费9
Thread-0生产10
Thread-2消费9
Thread-0生产10
Thread-3消费9
Thread-0生产10
Thread-1消费9
Thread-0生产10
Thread-2消费9
Thread-0生产10
Thread-1消费9
Thread-0生产10
Thread-1消费9
Thread-0生产10
Thread-3消费9
Thread-0生产10
Thread-1消费9
Thread-0生产10
Thread-1消费9
Thread-1消费8
Thread-2消费7
Thread-1消费6
Thread-2消费5
Thread-1消费4
Thread-3消费3
Thread-3消费2
Thread-2消费1
Thread-3消费0
二、Kafka入门
-
Kafka简介
- Kafka是一个分布式的流媒体平台。
- 应用消息系统、日志收集、用户行为追踪、流式处理。
-
Kafka特点
- 高吞吐量、消息持久化、高可靠性、高扩展性。
-
Kafka术语
Broker
、Zookeeper
Topic
、Partition
、Offset
Leader Replica
、Follower Replica
1. Kafka下载
Kafka官网: https://kafka.apache.org/
2. Kafka安装与配置
下载Kafka的安装包后进行解压就相当于安装成功了。
需要进行以下配置
修改 config包下的 zookeeper.properties:
修改 config包下的 server.properties:
3. Kafka的启动
首先在命令行中启动 Zookeeper
:
C:\Users\Andrew> d:
D:\> cd d:\kafka_2.13-3.2.3
d:\kafka_2.13-3.2.3>bin\windows\zookeeper-server-start.bat config\zookeeper.properties
启动成功后不关闭此窗口重新打开一个新的命令窗口用于启动 kafka
C:\Users\Andrew> d:
D:\> cd d:\kafka_2.13-3.2.3
d:\kafka_2.13-3.2.3>bin\windows\kafka-server-start.bat config\server.properties
注意 当遇到“‘wmic’不是内部或外部命令也不是可运行程序”。
在C盘下找到wbem文件夹且里面包含WMIC.exe将其添加到系统变量path中去。
比如我的路径是C:\Windows\System32\wbem
在系统变量path中新建该路径。就可以正常启动Kafka了。
4. Kafka使用
-
创建主题
cd到…\kafka_2.13-2.8.0\bin\windows
这里然后输入kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
创建服务器端口号为9092Kafka
默认端口号的topic
指生产者发布消息存储的位置在该服务器上localhost:9092
。--replication-factor
1 指1个副本。--partitions
1 指1个分区。--topic test
指该主题名为test
。
-
以生产者身份发送消息
输入kafka-console-producer.bat --broker-list localhost:9092 --topic test
生产者身份打开服务器列表中为localhost:9092
的服务器上的test
主题。--broker-list
指服务器列表。
并且输入要发送的消息
-
以消费者身份读取消息
新打开一个命令行窗口且cd到…\kafka_2.13-2.8.0\bin\windows
并输入kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
可以看到生产者发送的消息。并且这个消息队列中可以实时传送消息。
比如在生产者的命令行中继续输入信息很快在消费者这边也能得到消息。
三、Spring整合Kafka
-
引入依赖
spring-kafka
-
配置Kafka
- 配置
server
、consumer
- 配置
-
访问Kafka
- 生产者:
kafkaTemplate.send(topic, data);
- 消费者:
@KafkaListener(topics = {"test"}
public void handleMessage(ConsumerRecord record) {}
- 生产者:
1. 引入依赖
在 pom.xml
添加相关的依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.6</version>
</dependency>
2. 配置Kafka
#KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=3000
3. 测试
在 test
包下添加 KafkaTests
类代码如下
@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {
@Autowired
private KafkaProducer kafkaProducer;
@Test
public void testKafka() {
kafkaProducer.sendMessage("test", "你好");
kafkaProducer.sendMessage("test", "在吗");
try {
Thread.sleep(1000 * 20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Component
class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String topic, String content) {
kafkaTemplate.send(topic, content);
}
}
@Component
class KafkaConsumer {
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record) {
System.out.println(record.value());
}
}
4. 测试结果
创作不易如果有帮助到你请给题解点个赞和收藏让更多的人看到
关注博主不迷路内容持续更新中。