간단한 Java Application으로 Producer, Consumer 구현해보기

간단한 Spring Application으로 Producer, Consumer 구현해보기

<aside> 💡 Spring Kafka

Spring 환경에서 Kafka Client와 Message 구현을 서포트해주는 라이브러리

https://spring.io/projects/spring-kafka

</aside>

Untitled

Spring Initializr에서 프로젝트 생성

Untitled

Producer

  1. Spring Kafka를 사용하기 위한 Configuration 클래스 생성 - KafkaProducerConfig.java

    @Configuration
    public class KafkaProducerConfig {
    
    	private static final String BOOTSTRAP_SERVER = "localhost:9092";
    
    	/**
    	 * Kafka 접속을 위한 설정 및 Spring Kafka가 제공하는 Kafka Template을 사용하기 위해 등록
    	 * @return
    	 */
    	@Bean
    	public ProducerFactory<String, String> producerFactory() {
    		Map<String, Object> configProps = new HashMap<>();
    		configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
    		configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    		configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    
    		return new DefaultKafkaProducerFactory<>(configProps);
    	}
    
    	/**
    	 * 메시지 발송 모듈에서 사용할 Kafka Template 객체를 리턴
    	 * @return
    	 */
    	@Bean
    	public KafkaTemplate<String, String> kafkaTemplate() {
    		return new KafkaTemplate<>(producerFactory());
    	}
    
    	@Bean
    	public ProducerFactory<String, MyMessage> newProducerFactory() {
    		Map<String, Object> configProps = new HashMap<>();
    		configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
    		configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    		configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    
    		return new DefaultKafkaProducerFactory<>(configProps);
    	}
    
    	@Bean
    	public KafkaTemplate<String, MyMessage> newKafkaTemplate() {
    		return new KafkaTemplate<>(newProducerFactory());
    	}
    }
    
  2. 메시지 발행 요청을 받아줄 Web Controller 클래스 생성 - ProducerController.java

    @RestController
    public class ProducerController {
    
    	@Autowired
    	private KafkaProduceService kafkaProduceService;
    
    	@RequestMapping("/publish")
    	public String publish(String message) {
    		kafkaProduceService.send(message);
    		return "published a message :" + message;
    	}
    
    	@RequestMapping("/publish2")
    	public String publishWithCallback(String message) {
    		kafkaProduceService.sendWithCallback(message);
    		return "published a message with callback :" + message;
    	}
    
    	@RequestMapping("/publish3")
    	public String publishJson(MyMessage message) {
    		kafkaProduceService.sendJson(message);
    		return "published a message with callback :" + message.getName() + "," + message.getMessage();
    	}
    
    }
    
  3. 메시지 발송을 위한 서비스 모듈을 만들어서 컨트롤러에 연결하기 위한 클래스 생성 - KafkaProduceService.java

    @Service
    public class KafkaProduceService {
    
    	private static final String TOPIC_NAME = "topic5";
    
    	@Autowired
    	private KafkaTemplate<String, String> kafkaTemplate;
    
    	@Autowired
    	private KafkaTemplate<String, MyMessage> newKafkaTemplate;
    
    	public void sendJson(MyMessage message) {
    		newKafkaTemplate.send(TOPIC_NAME, message);
    	}
    
    	public void send(String message) {
    		kafkaTemplate.send(TOPIC_NAME, message);
    	}
    
    	/**
    	 * 메시지 발행 후 콜백 데이터를 받아서 출력
    	 * @param message
    	 */
    	public void sendWithCallback(String message) {
    		ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, message);
    
    		future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
    			@Override
    			public void onFailure(Throwable ex) {
    				System.out.println("Failed " + message + " due to : " + ex.getMessage());
    			}
    
    			@Override
    			public void onSuccess(SendResult<String, String> result) {
    				System.out.println("Sent " + message + " offset:"+result.getRecordMetadata().offset());
    			}
    		});
    	}