52
loading...
This website collects cookies to deliver better user experience
@Autowired
do atributo private Producer<String, TaxPayer> producer;
e passaremos para o construtor:private final Producer<String, TaxPayer> producer;
@Autowired
public TaxpayerService(@Qualifier("taxpayerProducer") Producer<String, TaxPayer> producer) {
this.producer = producer;
}
final MockProducer<String, TaxPayer> mockProducer = new MockProducer(true, new StringSerializer(), new KafkaAvroSerializer());
Producer
que a classe TaxpayerService necessita.public class TaxpayerServiceTest {
private TaxpayerService taxpayerService;
private MockProducer<String, TaxPayer> mockProducer;
@Test
void sendMessage(){
final MockProducer<String, TaxPayer> mockProducer = new MockProducer(true, new StringSerializer(), new KafkaAvroSerializer());
taxpayerService = new TaxpayerService(mockProducer);
final TaxpayerDTO taxpayerDTO = new TaxpayerDTO();
taxpayerDTO.setDocument("12345678901");
taxpayerDTO.setEmail("[email protected]");
taxpayerDTO.setName("John Doe");
taxpayerService.send(taxpayerDTO);
}
}
14:20:24.700 [main] INFO com.irs.sender.business.consumer.KafkaConsumerService - Recebendo TaxPayer
Mandando mensagem para pessoa :: Person(name=Guilherme, email=[email protected])
while(true)
para ficar sempre processando as mensagens que estavam sendo recebidas porém essa abordagem é pouco problemática pois o processamento ficará sempre atrelado à thread main , um ponto levantado pelo Pedro Alves. Para resolver isso há várias abordagens mas como estamos usando um projeto Spring Boot podemos criar uma tarefa agendada e com isso teremos uma thread em paralelo sendo executada periodicamente.@Configuration
@ComponentScan(basePackages = "com.irs.sender.business.consumer", basePackageClasses = KafkaConsumerService.class)
public class ThreadPoolTaskSchedulerConfig {
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(1);
threadPoolTaskScheduler.setThreadNamePrefix("KafkaScheduleService");
return threadPoolTaskScheduler;
}
}
private final Consumer<String, TaxPayer> kafkaConsumer;
private final Email email;
private final ThreadPoolTaskScheduler taskScheduler;
@Autowired
public KafkaConsumerService(@Qualifier("taxpayerConsumer") Consumer<String, TaxPayer> kafkaConsumer, Email email, ThreadPoolTaskScheduler taskScheduler) {
this.kafkaConsumer = kafkaConsumer;
this.email = email;
this.taskScheduler = taskScheduler;
}
@PostConstruct
e o laço while(true)
do método receive que ficará assim:@Override
public void receive() {
Consumer<String, TaxPayer> consumer = kafkaConsumer;
consumer.subscribe(Collections.singleton(this.topic()));
try {
consumer.poll(Duration.ofMillis(1000)).forEach(record -> {
log.info("Recebendo TaxPayer");
TaxPayer taxpayer = record.value();
Person person = Person.builder().email(taxpayer.getEmail()).name(taxpayer.getName()).build();
email.sendMessage(person);
});
consumer.commitSync();
} catch (Exception ex) {
log.error("Erro ao processar mensagem", ex);
}
}
@PostConstruct
public void init() {
taskScheduler.schedule(() -> {
this.receive();
}, new CronTrigger("* * * * * *"));
}
MockConsumer<String, TaxPayer> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
consumer.schedulePollTask(() -> {
consumer.rebalance(Collections.singletonList(new TopicPartition("taxpayer-avro", 0)));
consumer.addRecord(new ConsumerRecord<String, TaxPayer>("taxpayer-avro", 0, 0l, "key", new TaxPayer("Guilherme", "11122233344", "[email protected]", true)));
});
HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(new TopicPartition(TOPIC, 0), 0l);
consumer.updateBeginningOffsets(beginningOffsets);
consumer.subscribe(Collections.singleton("taxpayer-avro"));
private Email email;
void prepareEmailMock() {
email = person -> System.out.println("Mandando email teste :: " + person);
}
@Test
void testConsumer(){
service.receive();
}
20:29:45.000 [main] INFO com.irs.sender.business.consumer.KafkaConsumerService - Recebendo TaxPayer
Mandando email teste :: Person(name=Guilherme, email=[email protected])
public class KafkaConsumerServiceTest {
private MockConsumer<String, TaxPayer> consumer;
private KafkaConsumerService service;
private Email email;
private ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
private static final String TOPIC = "taxpayer-avro";
@BeforeEach
void prepareConsumer() {
consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
this.prepareEmailMock();
consumer.schedulePollTask(() -> {
consumer.rebalance(Collections.singletonList(new TopicPartition(TOPIC, 0)));
consumer.addRecord(new ConsumerRecord<String, TaxPayer>(TOPIC, 0, 0l, "key", this.prepareTaxpayerMock()));
});
HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(new TopicPartition(TOPIC, 0), 0l);
consumer.updateBeginningOffsets(beginningOffsets);
consumer.subscribe(Collections.singleton(TOPIC));
service = new KafkaConsumerService(consumer, email, taskScheduler);
}
void prepareEmailMock() {
email = person -> System.out.println("Mandando email teste :: " + person);
}
TaxPayer prepareTaxpayerMock() {
return new TaxPayer("Guilherme", "11122233344", "[email protected]", true);
}
@Test
void testConsumer(){
service.receive();
}
}