一、Kafka与Redpanda对比分析
1.1 核心差异
| 特性 | Apache Kafka | Redpanda |
|---|---|---|
| 架构 | JVM-based,需要ZooKeeper | C++编写,无外部依赖 |
| 性能 | 高吞吐量,相对较高延迟 | 更高吞吐量,更低延迟 |
| 资源占用 | 较高(JVM开销) | 更低(原生编译) |
| 部署复杂度 | 需要ZooKeeper协调 | 单二进制文件,简化部署 |
| 兼容性 | 原生Kafka协议 | 完全兼容Kafka协议 |
| 运维 | 成熟工具链 | 简化运维,内置监控 |
1.2 适用场景
选择Kafka的情况:
- 已有Kafka技术栈和运维经验
- 需要成熟的生态系统和工具链
- 大规模企业级部署
选择Redpanda的情况:
- 追求更高性能和更低延迟
- 希望简化部署和运维
- 资源受限环境
- 快速原型开发
二、Redpanda部署配置
1version: '3.3' 2 3services: 4 redpanda-0: 5 command: 6 - redpanda 7 - start 8 - --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092 9 - --advertise-kafka-addr internal://redpanda-0:9092,external://192.168.8.108:19092 10 - --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082 11 - --advertise-pandaproxy-addr internal://redpanda-0:8082,external://localhost:18082 12 - --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081 13 - --rpc-addr redpanda-0:33145 14 - --advertise-rpc-addr redpanda-0:33145 15 - --mode dev-container 16 - --smp 1 17 - --default-log-level=info 18 image: redpandadata/redpanda:v25.3.1 19 container_name: redpanda-0 20 volumes: 21 - redpanda-0:/var/lib/redpanda/data 22 networks: 23 - redpanda_network 24 ports: 25 - 18081:18081 26 - 18082:18082 27 - 19092:19092 28 - 19644:9644 29 30 console: 31 container_name: redpanda-console 32 image: redpandadata/console:v3.3.1 33 networks: 34 - redpanda_network 35 entrypoint: /bin/sh 36 command: -c 'echo "$$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console' 37 environment: 38 CONFIG_FILEPATH: /tmp/config.yml 39 CONSOLE_CONFIG_FILE: | 40 kafka: 41 brokers: ["redpanda-0:9092"] 42 schemaRegistry: 43 enabled: true 44 urls: ["http://redpanda-0:8081"] 45 redpanda: 46 adminApi: 47 enabled: true 48 urls: ["http://redpanda-0:9644"] 49 ports: 50 - 8080:8080 51 depends_on: 52 - redpanda-0 53 54networks: 55 redpanda_network: 56 driver: bridge 57 ipam: 58 driver: default 59 config: 60 - subnet: 172.101.0.0/16 61 62volumes: 63 redpanda-0: 64
2.2 启动命令
1 2# 启动服务 3docker-compose up -d 4 5# 查看服务状态 6docker-compose ps 7 8# 查看日志 9docker-compose logs -f redpanda 10 11# 停止服务 12docker-compose down 13
三、Spring Boot 3集成案例
3.1 项目结构
1 2src/ 3├── main/ 4│ ├── java/ 5│ │ └── com/example/redpandademo/ 6│ │ ├── RedpandaDemoApplication.java 7│ │ ├── config/ 8│ │ │ └── KafkaConfig.java 9│ │ ├── controller/ 10│ │ │ └── DemoController.java 11│ │ ├── service/ 12│ │ │ ├── KafkaProducerService.java 13│ │ │ └── KafkaConsumerService.java 14│ │ └── events/ 15│ │ └── UserEvent.java 16│ └── resources/ 17│ └── application.yml 18└── test/ 19 └── java/ 20 └── com/example/redpandademo/ 21 └── RedpandaDemoApplicationTests.java 22
3.2 Maven依赖配置
1 2 3<?xml version="1.0" encoding="UTF-8"?> 4<project xmlns="http://maven.apache.org/POM/4.0.0"> 5 <modelVersion>4.0.0</modelVersion> 6 7 <parent> 8 <groupId>org.springframework.boot</groupId> 9 <artifactId>spring-boot-starter-parent</artifactId> 10 <version>3.2.0</version> 11 <relativePath/> 12 </parent> 13 14 <groupId>com.example</groupId> 15 <artifactId>redpanda-demo</artifactId> 16 <version>1.0.0</version> 17 18 <properties> 19 <java.version>17</java.version> 20 <confluent.version>7.5.1</confluent.version> 21 </properties> 22 23 <dependencies> 24 <dependency> 25 <groupId>org.springframework.boot</groupId> 26 <artifactId>spring-boot-starter-web</artifactId> 27 </dependency> 28 29 <dependency> 30 <groupId>org.springframework.kafka</groupId> 31 <artifactId>spring-kafka</artifactId> 32 </dependency> 33 34 <dependency> 35 <groupId>io.confluent</groupId> 36 <artifactId>kafka-avro-serializer</artifactId> 37 <version>${confluent.version}</version> 38 </dependency> 39 40 <dependency> 41 <groupId>org.springframework.boot</groupId> 42 <artifactId>spring-boot-starter-test</artifactId> 43 <scope>test</scope> 44 </dependency> 45 46 <dependency> 47 <groupId>org.testcontainers</groupId> 48 <artifactId>kafka</artifactId> 49 <scope>test</scope> 50 </dependency> 51 </dependencies> 52 53 <build> 54 <plugins> 55 <plugin> 56 <groupId>org.springframework.boot</groupId> 57 <artifactId>spring-boot-maven-plugin</artifactId> 58 </plugin> 59 </plugins> 60 </build> 61</project> 62
3.3 应用配置
application.yml
1 2spring: 3 application: 4 name: redpanda-demo 5 kafka: 6 bootstrap-servers: localhost:19092 7 properties: 8 schema.registry.url: http://localhost:18081 9 10 producer: 11 key-serializer: org.apache.kafka.common.serialization.StringSerializer 12 value-serializer: org.springframework.kafka.support.serializer.JsonSerializer 13 acks: all 14 properties: 15 retries: 3 16 linger.ms: 10 17 batch.size: 16384 18 19 consumer: 20 group-id: redpanda-demo-group 21 auto-offset-reset: earliest 22 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer 23 value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer 24 properties: 25 spring.json.trusted.packages: "com.example.redpandademo.events" 26 27server: 28 port: 8080 29 30logging: 31 level: 32 com.example.redpandademo: DEBUG 33 org.apache.kafka: WARN 34
3.4 配置类
KafkaConfig.java
1 2 3package com.example.redpandademo.config; 4 5import org.apache.kafka.clients.admin.NewTopic; 6import org.springframework.context.annotation.Bean; 7import org.springframework.context.annotation.Configuration; 8import org.springframework.kafka.config.TopicBuilder; 9 10@Configuration 11public class KafkaConfig { 12 13 @Bean 14 public NewTopic userEventsTopic() { 15 return TopicBuilder.name("user-events") 16 .partitions(3) 17 .replicas(1) 18 .build(); 19 } 20 21 @Bean 22 public NewTopic orderEventsTopic() { 23 return TopicBuilder.name("order-events") 24 .partitions(5) 25 .replicas(1) 26 .build(); 27 } 28} 29
3.5 消息实体类
UserEvent.java
1 2 3package com.example.redpandademo.events; 4 5import java.time.LocalDateTime; 6 7public class UserEvent { 8 private String userId; 9 private String eventType; 10 private String email; 11 private LocalDateTime timestamp; 12 13 public UserEvent() {} 14 15 public UserEvent(String userId, String eventType, String email) { 16 this.userId = userId; 17 this.eventType = eventType; 18 this.email = email; 19 this.timestamp = LocalDateTime.now(); 20 } 21 22 // Getter和Setter方法 23 public String getUserId() { return userId; } 24 public void setUserId(String userId) { this.userId = userId; } 25 26 public String getEventType() { return eventType; } 27 public void setEventType(String eventType) { this.eventType = eventType; } 28 29 public String getEmail() { return email; } 30 public void setEmail(String email) { this.email = email; } 31 32 public LocalDateTime getTimestamp() { return timestamp; } 33 public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; } 34 35 @Override 36 public String toString() { 37 return String.format("UserEvent{userId='%s', eventType='%s', email='%s', timestamp=%s}", 38 userId, eventType, email, timestamp); 39 } 40} 41
3.6 消息生产者服务
KafkaProducerService.java
1 2package com.example.redpandademo.service; 3 4import com.example.redpandademo.events.UserEvent; 5import org.slf4j.Logger; 6import org.slf4j.LoggerFactory; 7import org.springframework.kafka.core.KafkaTemplate; 8import org.springframework.kafka.support.SendResult; 9import org.springframework.stereotype.Service; 10 11import java.util.concurrent.CompletableFuture; 12 13@Service 14public class KafkaProducerService { 15 16 private static final Logger log = LoggerFactory.getLogger(KafkaProducerService.class); 17 18 private final KafkaTemplate<String, Object> kafkaTemplate; 19 20 public KafkaProducerService(KafkaTemplate<String, Object> kafkaTemplate) { 21 this.kafkaTemplate = kafkaTemplate; 22 } 23 24 public void sendUserEvent(String topic, UserEvent userEvent) { 25 CompletableFuture<SendResult<String, Object>> future = 26 kafkaTemplate.send(topic, userEvent.getUserId(), userEvent); 27 28 future.whenComplete((result, ex) -> { 29 if (ex == null) { 30 log.info("消息发送成功: topic={}, key={}, partition={}, offset={}", 31 topic, userEvent.getUserId(), 32 result.getRecordMetadata().partition(), 33 result.getRecordMetadata().offset()); 34 } else { 35 log.error("消息发送失败: topic={}, key={}, error={}", 36 topic, userEvent.getUserId(), ex.getMessage()); 37 } 38 }); 39 } 40 41 public void sendWithCallback(String topic, String key, String message) { 42 CompletableFuture<SendResult<String, Object>> future = 43 kafkaTemplate.send(topic, key, message); 44 45 future.whenComplete((result, ex) -> { 46 if (ex == null) { 47 log.info("简单消息发送成功: topic={}, key={}, message={}", 48 topic, key, message); 49 } else { 50 log.error("简单消息发送失败: topic={}, key={}", topic, key, ex); 51 } 52 }); 53 } 54} 55
3.7 消息消费者服务
KafkaConsumerService.java
1 2 3package com.example.redpandademo.service; 4 5import com.example.redpandademo.events.UserEvent; 6import org.slf4j.Logger; 7import org.slf4j.LoggerFactory; 8import org.springframework.kafka.annotation.KafkaListener; 9import org.springframework.kafka.support.KafkaHeaders; 10import org.springframework.messaging.handler.annotation.Header; 11import org.springframework.messaging.handler.annotation.Payload; 12import org.springframework.stereotype.Service; 13 14@Service 15public class KafkaConsumerService { 16 17 private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class); 18 19 @KafkaListener(topics = "user-events", groupId = "user-group") 20 public void consumeUserEvent(@Payload UserEvent userEvent, 21 @Header(KafkaHeaders.RECEIVED_KEY) String key, 22 @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, 23 @Header(KafkaHeaders.OFFSET) long offset) { 24 log.info("收到用户事件: key={}, partition={}, offset={}, event={}", 25 key, partition, offset, userEvent); 26 27 processUserEvent(userEvent); 28 } 29 30 @KafkaListener(topics = "order-events", groupId = "order-group") 31 public void consumeOrderEvent(String message) { 32 log.info("收到订单事件: {}", message); 33 processOrderEvent(message); 34 } 35 36 private void processUserEvent(UserEvent userEvent) { 37 log.info("处理用户事件: {}", userEvent.getEventType()); 38 39 switch (userEvent.getEventType()) { 40 case "REGISTER": 41 log.info("新用户注册: {}", userEvent.getEmail()); 42 break; 43 case "LOGIN": 44 log.info("用户登录: {}", userEvent.getEmail()); 45 break; 46 default: 47 log.info("未知用户事件类型: {}", userEvent.getEventType()); 48 } 49 } 50 51 private void processOrderEvent(String message) { 52 log.info("处理订单事件: {}", message); 53 } 54} 55
3.8 REST控制器
DemoController.java
1 2package com.example.redpandademo.controller; 3 4import com.example.redpandademo.events.UserEvent; 5import com.example.redpandademo.service.KafkaProducerService; 6import org.springframework.web.bind.annotation.*; 7 8@RestController 9@RequestMapping("/api/kafka") 10public class DemoController { 11 12 private final KafkaProducerService kafkaProducerService; 13 14 public DemoController(KafkaProducerService kafkaProducerService) { 15 this.kafkaProducerService = kafkaProducerService; 16 } 17 18 @PostMapping("/user-event") 19 public String sendUserEvent(@RequestParam String userId, 20 @RequestParam String eventType, 21 @RequestParam String email) { 22 UserEvent userEvent = new UserEvent(userId, eventType, email); 23 kafkaProducerService.sendUserEvent("user-events", userEvent); 24 return "用户事件发送成功"; 25 } 26 27 @PostMapping("/message") 28 public String sendMessage(@RequestParam String topic, 29 @RequestParam String key, 30 @RequestParam String message) { 31 kafkaProducerService.sendWithCallback(topic, key, message); 32 return "消息发送成功"; 33 } 34 35 @GetMapping("/health") 36 public String health() { 37 return "Spring Boot 3 + Redpanda 服务运行正常"; 38 } 39} 40
3.9 主应用类
RedpandaDemoApplication.java
1 2 3package com.example.redpandademo; 4 5import org.springframework.boot.SpringApplication; 6import org.springframework.boot.autoconfigure.SpringBootApplication; 7 8@SpringBootApplication 9public class RedpandaDemoApplication { 10 11 public static void main(String[] args) { 12 SpringApplication.run(RedpandaDemoApplication.class, args); 13 } 14} 15