44
loading...
This website collects cookies to deliver better user experience
SELECT tweets.*,
users.*
FROM tweets
JOIN users ON tweets.sender_id = users.id
JOIN follows ON follows.followee_id = users.id
WHERE follows.follower_id = CURRENT_USER
ORDER BY tweets.timestamp
LIMIT TIMELINE_SIZE;
version: '2.2'
services:
postgres:
container_name: postgres
image: 'postgres:11.2-alpine'
ports:
- 5432:5432
environment:
- POSTGRES_DB=tweets
- POSTGRES_PASSWORD=pass
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 3s
retries: 7
redis:
container_name: redis
image: 'redis:6.0-alpine'
hostname: redis
ports:
- '6379:6379'
zookeeper:
container_name: zookeeper
image: 'confluentinc/cp-zookeeper:4.0.3'
ports:
- "2181:2181"
environment:
- ZOOKEEPER_CLIENT_PORT=2181
kafka:
container_name: kafka
image: 'confluentinc/cp-kafka:4.0.3'
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
schema-registry:
container_name: schema-registry
image: 'confluentinc/cp-schema-registry:4.0.3'
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
ports:
- "8081:8081"
wait-for-dependencies:
image: dadarek/wait-for-dependencies
container_name: wait-for-dependencies
scale: 0
command: redis:6379 schema-registry:8081 kafka:9092 zookeeper:2181 postgres:5432
V1__Init.sql
migration:CREATE TABLE "users" (
"id" BIGSERIAL UNIQUE,
"username" varchar UNIQUE,
PRIMARY KEY ("id", "username")
);
CREATE TABLE "follows" (
"follower_id" bigint,
"followee_id" bigint,
PRIMARY KEY ("follower_id", "followee_id")
);
CREATE TABLE "tweets" (
"id" SERIAL PRIMARY KEY,
"sender_id" bigint,
"text" text,
"timestamp" timestamp
);
ALTER TABLE "follows" ADD FOREIGN KEY ("follower_id") REFERENCES "users" ("id");
ALTER TABLE "follows" ADD FOREIGN KEY ("followee_id") REFERENCES "users" ("id");
ALTER TABLE "tweets" ADD FOREIGN KEY ("sender_id") REFERENCES "users" ("id");
postgres/migrations
and add the migration container setup to the docker-compose.yml
:migration:
container_name: migration
image: 'flyway/flyway:6.0.2-alpine'
command: -url=jdbc:postgresql://postgres:5432/tweets -user=postgres -password=pass migrate
volumes:
- ./postgres/migrations:/flyway/sql:Z
depends_on:
postgres:
condition: service_healthy
$ psql -h localhost -d tweets -U postgres
Password for user postgres:
psql (12.6, server 11.2)
Type "help" for help.
tweets=# \dt
List of relations
Schema | Name | Type | Owner
--------+-----------------------+-------+----------
public | flyway_schema_history | table | postgres
public | follows | table | postgres
public | tweets | table | postgres
public | users | table | postgres
(4 rows)
@EqualsAndHashCode
with @EqualsAndHashCode.Exclude
at the id
to make our tests easier when comparing models, since the id
field is generated automatically.package com.georgeoliveira.tweets.common.tweets.models;
import java.sql.Timestamp;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.experimental.FieldDefaults;
@Entity
@Table(schema = "public", name = "tweets")
@Getter
@Setter
@RequiredArgsConstructor
@FieldDefaults(level = AccessLevel.PRIVATE)
@EqualsAndHashCode
public class Tweet {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@EqualsAndHashCode.Exclude
Long id;
Long senderId;
String text;
Timestamp timestamp;
}
package com.georgeoliveira.tweets.common.tweets.dtos;
import java.time.LocalDateTime;
import lombok.Builder;
import lombok.Value;
@Value
@Builder(toBuilder = true)
public class TweetDto {
Long id;
Long senderId;
String text;
LocalDateTime timestamp;
}
package com.georgeoliveira.tweets.common.tweets.mappers;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import com.georgeoliveira.tweets.common.tweets.models.Tweet;
import java.sql.Timestamp;
import java.time.ZoneOffset;
public class TweetMapper {
public static TweetDto fromModel(Tweet tweet) {
return TweetDto.builder()
.id(tweet.getId())
.senderId(tweet.getSenderId())
.text(tweet.getText())
.timestamp(tweet.getTimestamp().toInstant().atOffset(ZoneOffset.UTC).toLocalDateTime())
.build();
}
public static Tweet fromDto(TweetDto tweetDto) {
Tweet tweet = new Tweet();
tweet.setId(tweetDto.getId());
tweet.setSenderId(tweetDto.getSenderId());
tweet.setText(tweetDto.getText());
tweet.setTimestamp(Timestamp.from(tweetDto.getTimestamp().toInstant(ZoneOffset.UTC)));
return tweet;
}
}
JpaRepository
interface for our model:package com.georgeoliveira.tweets.common.tweets.dal.dao;
import com.georgeoliveira.tweets.common.tweets.models.Tweet;
import io.micronaut.data.annotation.Repository;
import io.micronaut.data.jpa.repository.JpaRepository;
@Repository
public interface TweetsDao extends JpaRepository<Tweet, Long> {}
List<Tweet> findAllBySenderId(Long senderId);
package com.georgeoliveira.tweets.common.tweets.dal;
import com.georgeoliveira.tweets.common.tweets.dal.dao.TweetsDao;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import com.georgeoliveira.tweets.common.tweets.mappers.TweetMapper;
import com.georgeoliveira.tweets.common.tweets.models.Tweet;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.transaction.Transactional;
@Singleton
public class TweetsDal {
@Inject TweetsDao tweetsDao;
@Transactional
public TweetDto persistTweet(TweetDto tweetDto) {
Tweet tweet = TweetMapper.fromDto(tweetDto);
Tweet persistedTweet = tweetsDao.save(tweet);
return TweetMapper.fromModel(persistedTweet);
}
}
UserTransaction utx = entityManager.getTransaction();
try {
utx.begin();
businessLogic();
utx.commit();
} catch(Exception ex) {
utx.rollback();
throw ex;
}
begin()
call starts a transaction, and everything fro now on is considered atomic. When the commit()
happens, then the information is persisted and the transaction is finished. If any error happens inside businessLogic()
, the catc()
flow is triggered and then a rollback happens, ensuring nothing is persisted and so the transaction also finishes.@Transactional
so that we can benefit from the Micronaut's HibernateTransactionManager
that handles transactions management for us. package com.georgeoliveira.tweets.api.services;
import com.georgeoliveira.tweets.api.dtos.PostTweetRequestDto;
import com.georgeoliveira.tweets.api.mappers.TweetRequestMapper;
import com.georgeoliveira.tweets.common.tweets.dal.TweetsDal;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import javax.inject.Inject;
import javax.inject.Singleton;
@Singleton
public class TweetsService {
@Inject TweetsDal tweetsDal;
public TweetDto publishTweet(PostTweetRequestDto request) {
TweetDto tweetDto = TweetRequestMapper.fromPostRequest(request);
TweetDto persistedTweetDto = tweetsDal.persistTweet(tweetDto);
return persistedTweetDto;
}
}
package com.georgeoliveira.tweets.api.dtos;
import io.micronaut.core.annotation.Introspected;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Positive;
import lombok.Builder;
import lombok.Value;
@Value
@Builder(toBuilder = true)
@Introspected
public class PostTweetRequestDto {
@NotNull @NotBlank Long senderId;
@NotNull @NotBlank String text;
@NotNull @Positive Long timestamp;
}
package com.georgeoliveira.tweets.api.mappers;
import com.georgeoliveira.tweets.api.dtos.PostTweetRequestDto;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
public class TweetRequestMapper {
public static TweetDto fromPostRequest(PostTweetRequestDto postTweetRequestDto) {
return TweetDto.builder()
.id(null)
.senderId(postTweetRequestDto.getSenderId())
.text(postTweetRequestDto.getText())
.timestamp(
LocalDateTime.from(
Instant.ofEpochMilli(postTweetRequestDto.getTimestamp()).atZone(ZoneOffset.UTC)))
.build();
}
}
@Controller
annotation to create our Tweets Controller with our POST /tweets
route. We also use the @Validated
annotation to ensure constraints over fields are applied.package com.georgeoliveira.tweets.api.controllers;
import com.georgeoliveira.tweets.api.dtos.PostTweetRequestDto;
import com.georgeoliveira.tweets.api.services.TweetsService;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import io.micronaut.validation.Validated;
import java.util.Objects;
import javax.inject.Inject;
import javax.validation.Valid;
@Validated
@Controller("/tweets")
public class TweetsController {
@Inject TweetsService tweetsService;
@Post
public HttpResponse<Long> postTweet(@Valid @Body PostTweetRequestDto request) {
TweetDto tweet = tweetsService.publishTweet(request);
if (Objects.nonNull(tweet)) {
return HttpResponse.created(tweet.getId());
}
return HttpResponse.notFound();
}
}
$ ./gradlew build -x test
$ java -jar build/libs/tweets-all.jar
$ curl -L -X POST 'http://localhost:8080/tweets' -H 'Content-Type: application/json' --data-raw '{
"sender_id": 12356,
"text": "xalala",
"timestamp": 1619626150979
}'
senderId
must be a valid user, so, first we must populate the Users table manually:$ psql -h localhost -d tweets -U postgres
Password for user postgres:
psql (12.6, server 11.2)
Type "help" for help.
tweets=# INSERT INTO users(username) VALUES ('my_username');
INSERT 0 1
tweets=# SELECT id, username FROM users WHERE username = 'my_username';
id | username
----+-------------
1 | my_username
(1 row)
protocols/avro/events/key.avro
{
"type": "record",
"name": "Key",
"namespace": "com.georgeoliveira.events",
"fields": [
{
"name": "aggregate_id",
"type": [
"null",
"string"
],
"default": null
}
]
}
protocols/avro/events/value.avro
{
"type": "record",
"name": "Value",
"namespace": "com.georgeoliveira.events",
"fields": [
{
"name": "event_id",
"doc": "A valid V4 UUID. Each event has a unique id.",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "aggregate_id",
"doc": "The id of the whole aggregate that had any of the nested entities or the root entity edited.",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "type",
"doc": "The type of the event, eg. \"create\" or \"update\".",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "aggregate",
"doc": "The whole aggregate that had any of the nested entities or the root entity edited.",
"type": [
"null",
"string"
],
"default": null
}
]
}
aggregate
and type
fields. aggregate
will contain the payload with the relevant data needed to process the event. So, for example, if the we're developing a chat app and the event is "new message sent", the aggregate
could be a JSON like:{
"sender_id": ...,
"recipient_id": ...,
"message": ....
}
{
"id": ...,
"sender_id": ...,
"text": ...,
"timestamp": ...
}
type
field to store the type of event that occurred so that different consumers can decide how to process it, e.g., the post_tweet
event may trigger the delivery processing, but the edit_tweet
may not.protocols/proto/tweets/tweet.proto
syntax = "proto3";
package com.georgeoliveira.tweets.proto;
option java_outer_classname = "TweetProtobuf";
message Tweet {
int64 id = 1;
int64 sender_id = 2;
string text = 3;
int64 timestamp = 4;
}
aggregate
field has the schema that the consumers expect. Also, these schemas can be versioned, so we can keep track of it's evolution.Value
avro schema that we defined earlier, as shown in our second migration V2__AddTweetsOutboxTable.sql
:CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TABLE tweets_outbox (
event_id UUID DEFAULT uuid_generate_v4(),
aggregate_id TEXT,
type TEXT,
aggregate JSONB,
created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT NOW()
);
Logical decoding is the process of extracting all persistent changes to a database's tables into a coherent, easy to understand format which can be interpreted without detailed knowledge of the database's internal state.
connect:
image: 'debezium/connect:1.5'
ports:
- "8083:8083"
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=connect_configs
- OFFSET_STORAGE_TOPIC=connect_offsets
- STATUS_STORAGE_TOPIC=connect_statuses
depends_on:
- kafka
- postgres
curl -d @"connector-config.json" \
-H "Content-Type: application/json" \
-X POST http://connect:8083/connectors
connector-config.json
is:{
"name": "tweets_outbox_connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"slot.name": "tweets_outbox_connector",
"transforms": "unwrap,ValueToKey,SetKeySchema,SetValueSchema",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields": "aggregate_id",
"transforms.SetValueSchema.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.SetValueSchema.schema.name": "com.georgeoliveira.events.Value",
"transforms.SetKeySchema.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
"transforms.SetKeySchema.schema.name": "com.georgeoliveira.events.Key",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081/",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081/",
"plugin.name": "wal2json_rds",
"database.server.name": "postgres",
"database.dbname": "tweets",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "pass",
"schema.include.list": "public",
"table.include.list": "public.tweets_outbox"
}
}
$ curl http://connect:8083/connectors/tweets_outbox_connector/status
curl -X DELETE http://debezium.debezium/connectors/tweets_outbox_connector -v
select pg_drop_replication_slot('tweets_outbox_connector');
$ psql -h localhost -d tweets -U postgres
Password for user postgres:
psql (12.6)
Type "help" for help.
tweets=# insert into tweets_outbox(aggregate_id, type, aggregate) values ('1', 'test', '{"test":"this is a test"}');
INSERT 0 1
tweets=#
kafkacat
to consume from the Kafka topic postgres.public.tweets_outbox
, we get something like:$ kafkacat -C -b kafka:9092 -t postgres.public.tweets_outbox
H71dbed94-c3db-410e-b7a0-7b0a120e8617test4{"test": "this is a test"}����߹�
% Reached end of topic postgres.public.tweets_outbox [0] at offset 1
package com.georgeoliveira.tweets.common.tweets.outbox.models;
import com.vladmihalcea.hibernate.type.json.JsonBinaryType;
import java.sql.Timestamp;
import java.util.UUID;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.experimental.FieldDefaults;
import org.hibernate.annotations.Type;
import org.hibernate.annotations.TypeDef;
import org.hibernate.annotations.TypeDefs;
@Entity
@Table(schema = "public", name = "tweets_outbox")
@Getter
@Setter
@RequiredArgsConstructor
@FieldDefaults(level = AccessLevel.PRIVATE)
@EqualsAndHashCode
@TypeDefs({@TypeDef(name = "jsonb", typeClass = JsonBinaryType.class)})
public class TweetOutbox {
@Id
@Column(name = "event_id")
@GeneratedValue(strategy = GenerationType.AUTO)
@EqualsAndHashCode.Exclude
UUID eventId;
@Column(name = "aggregate_id")
String aggregateId;
@Column(name = "type")
String type;
@Type(type = "jsonb")
@Column(name = "aggregate", columnDefinition = "jsonb")
String aggregate;
@Column(name = "created_at", insertable = false)
@EqualsAndHashCode.Exclude
Timestamp createdAt;
}
package com.georgeoliveira.tweets.common.tweets.outbox.mappers;
import com.georgeoliveira.campaigns.proto.TweetProtobuf;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import com.georgeoliveira.tweets.common.tweets.outbox.dtos.EventType;
import com.georgeoliveira.tweets.common.tweets.outbox.models.TweetOutbox;
import com.googlecode.protobuf.format.JsonFormat;
import java.time.ZoneOffset;
public class TweetOutboxMapper {
public static TweetOutbox toOutboxModel(TweetDto tweetDto, EventType eventType) {
TweetProtobuf.Tweet tweetProto = toProto(tweetDto);
String aggregate = toAggregate(tweetProto);
TweetOutbox tweetOutbox = new TweetOutbox();
tweetOutbox.setType(eventType.toString());
tweetOutbox.setAggregate(aggregate);
tweetOutbox.setAggregateId(String.valueOf(tweetDto.getId()));
return tweetOutbox;
}
private static TweetProtobuf.Tweet toProto(TweetDto tweetDto) {
return TweetProtobuf.Tweet.newBuilder()
.setId(tweetDto.getId())
.setSenderId(tweetDto.getSenderId())
.setText(tweetDto.getText())
.setTimestamp(tweetDto.getTimestamp().toInstant(ZoneOffset.UTC).toEpochMilli())
.build();
}
private static String toAggregate(TweetProtobuf.Tweet tweetProto) {
JsonFormat jsonFormat = new JsonFormat();
return jsonFormat.printToString(tweetProto);
}
}
aggregate
field of the outbox.package com.georgeoliveira.tweets.common.tweets.outbox.dal.dao;
import com.georgeoliveira.tweets.common.tweets.outbox.models.TweetOutbox;
import io.micronaut.data.annotation.Repository;
import io.micronaut.data.jpa.repository.JpaRepository;
import java.util.UUID;
@Repository
public interface TweetsOutboxDao extends JpaRepository<TweetOutbox, UUID> {}
package com.georgeoliveira.tweets.common.tweets.outbox.dal;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import com.georgeoliveira.tweets.common.tweets.outbox.dal.dao.TweetsOutboxDao;
import com.georgeoliveira.tweets.common.tweets.outbox.dtos.EventType;
import com.georgeoliveira.tweets.common.tweets.outbox.mappers.TweetOutboxMapper;
import com.georgeoliveira.tweets.common.tweets.outbox.models.TweetOutbox;
import javax.inject.Inject;
import javax.inject.Singleton;
@Singleton
public class TweetsOutboxDal {
@Inject TweetsOutboxDao tweetsOutboxDao;
public void sendToOutbox(TweetDto tweetDto, EventType eventType) {
TweetOutbox tweetOutbox = TweetOutboxMapper.toOutboxModel(tweetDto, eventType);
tweetsOutboxDao.save(tweetOutbox);
}
}
package com.georgeoliveira.tweets.api.services;
import com.georgeoliveira.tweets.api.dtos.PostTweetRequestDto;
import com.georgeoliveira.tweets.api.mappers.TweetRequestMapper;
import com.georgeoliveira.tweets.common.tweets.dal.TweetsDal;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import com.georgeoliveira.tweets.common.tweets.outbox.dal.TweetsOutboxDal;
import com.georgeoliveira.tweets.common.tweets.outbox.dtos.EventType;
import javax.inject.Inject;
import javax.inject.Singleton;
@Singleton
public class TweetsService {
@Inject TweetsDal tweetsDal;
@Inject
TweetsOutboxDal tweetsOutboxDal;
public TweetDto publishTweet(PostTweetRequestDto request) {
TweetDto tweetDto = TweetRequestMapper.fromPostRequest(request);
TweetDto persistedTweetDto = tweetsDal.persistTweet(tweetDto);
tweetsOutboxDal.sendToOutbox(persistedTweetDto, EventType.PUBLISH_TWEET);
return persistedTweetDto;
}
}
user_id
.syntax = "proto3";
package com.georgeoliveira.tweets.proto;
option java_outer_classname = "TimelineProtobuf";
import "tweets/tweet.proto";
message Timeline {
int64 user_id = 1;
repeated com.georgeoliveira.tweets.proto.Tweet tweets = 2;
}
package com.georgeoliveira.tweets.common.timelines.dtos;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import java.util.List;
import lombok.Builder;
import lombok.Value;
@Builder(toBuilder = true)
@Value
public class TimelineDto {
Long userId;
List<TweetDto> tweetsList;
}
TimelineDto
from a list of TweetDto
and maps TimelineDto
to byte arrays.package com.georgeoliveira.tweets.common.timelines.mappers;
import com.georgeoliveira.tweets.common.timelines.dtos.TimelineDto;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import com.georgeoliveira.tweets.common.tweets.mappers.TweetMapper;
import com.georgeoliveira.tweets.proto.TimelineProtobuf;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
public class TimelineMapper {
public static TimelineDto fromList(Long userId, List<TweetDto> tweetsList) {
return TimelineDto.builder().userId(userId).tweetsList(tweetsList).build();
}
public static Pair<Long, byte[]> toUserIdTimelineByteArrayPair(TimelineDto timelineDto) {
return Pair.of(timelineDto.getUserId(), toTimelineByteArray(timelineDto));
}
private static byte[] toTimelineByteArray(TimelineDto timelineDto) {
return toProto(timelineDto).toByteArray();
}
private static TimelineProtobuf.Timeline toProto(TimelineDto timelineDto) {
return TimelineProtobuf.Timeline.newBuilder()
.setUserId(timelineDto.getUserId())
.addAllTweets(
timelineDto
.getTweetsList()
.stream()
.map(tweetDto -> TweetMapper.toProto(tweetDto))
.collect(Collectors.toList()))
.build();
}
}
[TimelineCommands
interface](https://lettuce.io/core/release/reference/index.html#redis-command-interfaces), that defines the methods that we'll use to interact with Redis:package com.georgeoliveira.tweets.common.timelines.dal.dao;
import io.lettuce.core.dynamic.Commands;
import io.lettuce.core.dynamic.annotation.Command;
public interface TimelineCommands extends Commands {
@Command("SET")
void set(String userId, byte[] timelineByteArray);
@Command("GET")
byte[] get(String userId);
}
package com.georgeoliveira.tweets.common.timelines.dal.dao.factories;
import com.georgeoliveira.tweets.common.timelines.dal.dao.TimelineCommands;
import io.lettuce.core.RedisClient;
import io.lettuce.core.dynamic.RedisCommandFactory;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Value;
import javax.inject.Singleton;
@Factory
public class TimelineCommandsFactory {
@Value("${redis.host}")
private String REDIS_HOST;
@Singleton
TimelineCommands timelineCommands() {
RedisClient redisClient = RedisClient.create(REDIS_HOST);
RedisCommandFactory commandFactory = new RedisCommandFactory(redisClient.connect());
return commandFactory.getCommands(TimelineCommands.class);
}
}
package com.georgeoliveira.tweets.common.timelines.dal;
import com.georgeoliveira.tweets.common.timelines.dal.dao.TimelineCommands;
import com.georgeoliveira.tweets.common.timelines.dtos.TimelineDto;
import com.georgeoliveira.tweets.common.timelines.mappers.TimelineMapper;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair;
@Singleton
public class TimelinesDal {
@Inject TimelineCommands timelineCommandsDao;
public void persistTimeline(TimelineDto timelineDto) {
Pair<Long, byte[]> userIdTimelinePair =
TimelineMapper.toUserIdTimelineByteArrayPair(timelineDto);
timelineCommandsDao.set(
String.valueOf(userIdTimelinePair.getLeft()), userIdTimelinePair.getRight());
}
}
Post a tweet: find all people that follows the tweet author and insert the new tweet at their home timeline caches.
User
model:package com.georgeoliveira.tweets.common.users.models;
import java.util.ArrayList;
import java.util.List;
import javax.persistence.Entity;
import javax.persistence.FetchType;
import javax.persistence.Id;
import javax.persistence.JoinColumn;
import javax.persistence.JoinTable;
import javax.persistence.OneToMany;
import javax.persistence.Table;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.experimental.FieldDefaults;
@Entity
@Table(schema = "public", name = "users")
@Getter
@Setter
@RequiredArgsConstructor
@FieldDefaults(level = AccessLevel.PRIVATE)
@EqualsAndHashCode
public class User {
@Id @EqualsAndHashCode.Exclude Long id;
String username;
@OneToMany(fetch = FetchType.EAGER)
@JoinTable(
name = "follows",
joinColumns = {@JoinColumn(name = "followee_id")},
inverseJoinColumns = {@JoinColumn(name = "follower_id")})
List<User> followers = new ArrayList<>();
}
followers
field that maps the follows
association table and will allow us to retrieve a user's followers. Also note that we only mapped what is useful to this very specific flow that we are building, and so many features required to model a "real user" were left behind.package com.georgeoliveira.tweets.common.users.dtos;
import java.util.List;
import lombok.Builder;
import lombok.Value;
@Value
@Builder(toBuilder = true)
public class UserDto {
Long id;
String username;
List<UserDto> followers;
}
package com.georgeoliveira.tweets.common.users.mappers;
import com.georgeoliveira.tweets.common.users.dtos.UserDto;
import com.georgeoliveira.tweets.common.users.models.User;
import java.util.stream.Collectors;
public class UserMapper {
public static UserDto fromModel(User user) {
return UserDto.builder()
.id(user.getId())
.username(user.getUsername())
.followers(
user.getFollowers().stream().map(UserMapper::fromModel).collect(Collectors.toList()))
.build();
}
}
package com.georgeoliveira.tweets.common.users.dal.dao;
import com.georgeoliveira.tweets.common.users.models.User;
import io.micronaut.data.annotation.Repository;
import io.micronaut.data.jpa.repository.JpaRepository;
@Repository
public interface UsersDao extends JpaRepository<User, Long> {}
package com.georgeoliveira.tweets.common.users.dal;
import com.georgeoliveira.tweets.common.users.dal.dao.UsersDao;
import com.georgeoliveira.tweets.common.users.dtos.UserDto;
import com.georgeoliveira.tweets.common.users.mappers.UserMapper;
import java.util.Collections;
import java.util.List;
import javax.inject.Inject;
import javax.inject.Singleton;
@Singleton
public class UsersDal {
@Inject UsersDao usersDao;
public List<UserDto> getUserFollowers(Long userId) {
return usersDao
.findById(userId)
.map(UserMapper::fromModel)
.map(UserDto::getFollowers)
.orElse(Collections.emptyList());
}
}
TweetsDal
component by adding the methodgetTimelineForUser
:public List<TweetDto> getTimelineForUser(Long userId, Long timelineSize) {
return tweetsDao
.findTimelineTweetsByUserId(userId, timelineSize)
.stream()
.map(TweetMapper::fromModel)
.collect(Collectors.toList());
}
@Query(
value =
"SELECT * FROM tweets t JOIN follows f ON f.followee_id = t.sender_id WHERE f.follower_id = :userId ORDER BY timestamp DESC LIMIT :limit",
nativeQuery = true)
List<Tweet> findTimelineTweetsByUserId(Long userId, Long limit);
TimelineDto
using the mapper we defined earlier.package com.georgeoliveira.tweets.worker.processors;
import com.georgeoliveira.tweets.common.timelines.dal.TimelinesDal;
import com.georgeoliveira.tweets.common.timelines.dtos.TimelineDto;
import com.georgeoliveira.tweets.common.timelines.mappers.TimelineMapper;
import com.georgeoliveira.tweets.common.tweets.dal.TweetsDal;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import com.georgeoliveira.tweets.common.users.dal.UsersDal;
import com.georgeoliveira.tweets.common.users.dtos.UserDto;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
@Singleton
public class TweetsProcessor implements Consumer<TweetDto> {
@Inject TimelinesDal timelinesDal;
@Inject TweetsDal tweetsDal;
@Inject UsersDal usersDal;
@Override
public void accept(TweetDto tweetDto) {
List<UserDto> authorFollowers = usersDal.getUserFollowers(tweetDto.getSenderId());
List<TimelineDto> timelineDtos =
authorFollowers
.stream()
.map(UserDto::getId)
.map(
userId ->
TimelineMapper.fromList(userId, tweetsDal.getTimelineForUser(userId, 100L)))
.collect(Collectors.toList());
timelineDtos.forEach(timelineDto -> timelinesDal.persistTimeline(timelineDto));
}
}
package com.georgeoliveira.tweets.worker.listeners;
import com.georgeoliveira.events.Key;
import com.georgeoliveira.events.Value;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import com.georgeoliveira.tweets.common.tweets.mappers.TweetMapper;
import com.georgeoliveira.tweets.worker.processors.TweetsProcessor;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.Topic;
import java.io.IOException;
import javax.inject.Inject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
public class TweetsListener {
@Inject TweetsProcessor tweetsProcessor;
@Topic("${topics.tweets}")
void listen(ConsumerRecord<Key, Value> event) throws IOException {
TweetDto tweetDto = TweetMapper.fromRecord(event);
tweetsProcessor.accept(tweetDto);
}
}
offsetReset
strategy defined as OffsetReset.EARLIEST
which makes the listener that a consumer will start reading the earliest available records for the topic. More details on offset management are available here.Home timeline: just read from the cache.
fromByteArray
method to the TimelineMapper
:public static Optional<TimelineDto> fromByteArray(byte[] timelineByteArray) {
try {
TimelineProtobuf.Timeline timelineProto =
TimelineProtobuf.Timeline.parseFrom(timelineByteArray);
TimelineDto timelineDto = fromProto(timelineProto);
return Optional.of(timelineDto);
} catch (InvalidProtocolBufferException | NullPointerException e) {
return Optional.empty();
}
}
TimelinesDal
:public Optional<TimelineDto> getUserTimeline(Long userId) {
byte[] timelineByteArray = timelineCommandsDao.get(String.valueOf(userId));
return TimelineMapper.fromByteArray(timelineByteArray);
}
package com.georgeoliveira.tweets.api.services;
import com.georgeoliveira.tweets.common.timelines.dal.TimelinesDal;
import com.georgeoliveira.tweets.common.timelines.dtos.TimelineDto;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import java.util.Collections;
import java.util.List;
import javax.inject.Inject;
import javax.inject.Singleton;
@Singleton
public class TimelinesService {
@Inject TimelinesDal timelinesDal;
public List<TweetDto> getUserTimelineTweets(Long userId) {
return timelinesDal
.getUserTimeline(userId)
.map(TimelineDto::getTweetsList)
.orElse(Collections.emptyList());
}
}
package com.georgeoliveira.tweets.api.controllers;
import com.georgeoliveira.tweets.api.services.TimelinesService;
import com.georgeoliveira.tweets.common.tweets.dtos.TweetDto;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.PathVariable;
import java.util.List;
import javax.inject.Inject;
@Controller("/timelines")
public class TimelinesController {
@Inject TimelinesService timelinesService;
@Get("/{userId}")
HttpResponse<List<TweetDto>> getUserTimeline(@PathVariable Long userId) {
List<TweetDto> tweetDtoList = timelinesService.getUserTimelineTweets(userId);
if (tweetDtoList.isEmpty()) {
return HttpResponse.noContent();
}
return HttpResponse.status(HttpStatus.FOUND).body(tweetDtoList);
}
}
$ ./gradlew build -x test
$ java -jar build/libs/tweets-all.jar
tweets=# insert into users(username) values('cool_user');
INSERT 0 1
tweets=# insert into follows(follower_id, followee_id) values(2, 1);
INSERT 0 1
1
:$ curl -L -X POST 'http://localhost:8080/tweets' -H 'Content-Type: application/json' --data-raw '{
"sender_id": 1,
"text": "my new tweet",
"timestamp": 1619626150979
}
2
:$ curl http://localhost:8080/timelines/2
[{"id":2,"text":"my new tweet","timestamp":[2021,4,28,16,9,10,979000000],"senderId":1}]