<aside> 💡 Spring Kafka
Spring 환경에서 Kafka Client와 Message 구현을 서포트해주는 라이브러리
https://spring.io/projects/spring-kafka
</aside>
Spring Initializr에서 프로젝트 생성
Producer
Spring Kafka를 사용하기 위한 Configuration 클래스 생성 - KafkaProducerConfig.java
@Configuration
public class KafkaProducerConfig {
private static final String BOOTSTRAP_SERVER = "localhost:9092";
/**
* Kafka 접속을 위한 설정 및 Spring Kafka가 제공하는 Kafka Template을 사용하기 위해 등록
* @return
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
/**
* 메시지 발송 모듈에서 사용할 Kafka Template 객체를 리턴
* @return
*/
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, MyMessage> newProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, MyMessage> newKafkaTemplate() {
return new KafkaTemplate<>(newProducerFactory());
}
}
메시지 발행 요청을 받아줄 Web Controller 클래스 생성 - ProducerController.java
@RestController
public class ProducerController {
@Autowired
private KafkaProduceService kafkaProduceService;
@RequestMapping("/publish")
public String publish(String message) {
kafkaProduceService.send(message);
return "published a message :" + message;
}
@RequestMapping("/publish2")
public String publishWithCallback(String message) {
kafkaProduceService.sendWithCallback(message);
return "published a message with callback :" + message;
}
@RequestMapping("/publish3")
public String publishJson(MyMessage message) {
kafkaProduceService.sendJson(message);
return "published a message with callback :" + message.getName() + "," + message.getMessage();
}
}
메시지 발송을 위한 서비스 모듈을 만들어서 컨트롤러에 연결하기 위한 클래스 생성 - KafkaProduceService.java
@Service
public class KafkaProduceService {
private static final String TOPIC_NAME = "topic5";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaTemplate<String, MyMessage> newKafkaTemplate;
public void sendJson(MyMessage message) {
newKafkaTemplate.send(TOPIC_NAME, message);
}
public void send(String message) {
kafkaTemplate.send(TOPIC_NAME, message);
}
/**
* 메시지 발행 후 콜백 데이터를 받아서 출력
* @param message
*/
public void sendWithCallback(String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("Failed " + message + " due to : " + ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("Sent " + message + " offset:"+result.getRecordMetadata().offset());
}
});
}