38
loading...
This website collects cookies to deliver better user experience
version: '3.8'
services:
...
mongodb:
image: mongo
container_name: mongodb
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: $MONGODB_USERNAME
MONGO_INITDB_ROOT_PASSWORD: $MONGODB_PASSWORD
MONGO_INITDB_DATABASE: solar_system_info
ports:
- 27017:27017
mongodb-seed:
image: mongo
container_name: mongodb-seed
depends_on:
- mongodb
volumes:
- ./mongodb-init:/mongodb-init
links:
- mongodb
command:
mongoimport --host mongodb --db solar_system_info --collection planets --authenticationDatabase admin --username $MONGODB_USERNAME --password $MONGODB_PASSWORD --drop --jsonArray --file /mongodb-init/init.json
redis:
image: redis:alpine
container_name: redis
ports:
- 6379:6379
mongodb-seed
container will be described further.mongo
shell by running the following command:docker exec -it mongodb mongo --username admin --password password
mongodb
is the name of the Docker container, mongo
is the shell)list all databases using show dbs
list all inserted data
use solar_system_info
show collections
db.planets.find()
docker exec -it redis redis-cli
> set mykey somevalue
OK
> get mykey
"somevalue"
keys *
command.mongodb-seed
container and mongoimport
command running inside it:mongodb-seed:
image: mongo
container_name: mongodb-seed
depends_on:
- mongodb
volumes:
- ./mongodb-init:/mongodb-init
links:
- mongodb
command:
mongoimport --host mongodb --db solar_system_info --collection planets --authenticationDatabase admin --username $MONGODB_USERNAME --password $MONGODB_PASSWORD --drop --jsonArray --file /mongodb-init/init.json
mongofiles --host mongodb --db solar_system_info --authenticationDatabase admin --username $MONGODB_USERNAME --password $MONGODB_PASSWORD put /mongodb-init/images/*.jpg
images
folder should be mounted as a volume in the Docker Compose service definition)[package]
name = "mongodb-redis"
version = "0.1.0"
edition = "2018"
[dependencies]
mongodb = "2.0.0-beta.1"
redis = { version = "0.20.2", features = ["tokio-comp", "connection-manager"] }
actix-web = "4.0.0-beta.7"
tokio = "1.7.1"
tokio-stream = "0.1.6"
chrono = { version = "0.4.19", features = ["serde"] }
serde = "1.0.126"
serde_json = "1.0.64"
dotenv = "0.15.0"
derive_more = "0.99.14"
log = "0.4.14"
env_logger = "0.8.4"
rust-embed = "5.9.0"
mime = "0.3.16"
├───images
│
├───mongodb-init
│ init.json
│
└───src
db.rs
dto.rs
errors.rs
handlers.rs
index.html
main.rs
model.rs
redis.rs
services.rs
#[actix_web::main]
async fn main() -> std::io::Result<()> {
dotenv::from_filename(".env.local").ok();
env_logger::init();
info!("Starting MongoDB & Redis demo server");
let mongodb_uri = env::var("MONGODB_URI").expect("MONGODB_URI env var should be specified");
let mongodb_client = MongoDbClient::new(mongodb_uri).await;
let redis_uri = env::var("REDIS_URI").expect("REDIS_URI env var should be specified");
let redis_client = redis::create_client(redis_uri)
.await
.expect("Can't create Redis client");
let redis_connection_manager = redis_client
.get_tokio_connection_manager()
.await
.expect("Can't create Redis connection manager");
let planet_service = Arc::new(PlanetService::new(
mongodb_client,
redis_client,
redis_connection_manager.clone(),
));
let rate_limiting_service = Arc::new(RateLimitingService::new(redis_connection_manager));
...
}
MongoDbClient
, Redis client, and Redis connection manager are created.const DB_NAME: &str = "solar_system_info";
const COLLECTION_NAME: &str = "planets";
pub async fn get_planets(
&self,
planet_type: Option<PlanetType>,
) -> Result<Vec<Planet>, CustomError> {
let filter = planet_type.map(|pt| {
doc! { "type": pt.to_string() }
});
let mut planets = self.get_planets_collection().find(filter, None).await?;
let mut result: Vec<Planet> = Vec::new();
while let Some(planet) = planets.next().await {
result.push(planet?);
}
Ok(result)
}
fn get_planets_collection(&self) -> Collection<Planet> {
self.client
.database(DB_NAME)
.collection::<Planet>(COLLECTION_NAME)
}
get_planets
also contains the example on how to filter MongoDB documents by some criteria.#[derive(Serialize, Deserialize, Debug)]
pub struct Planet {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub id: Option<ObjectId>,
pub name: String,
pub r#type: PlanetType,
pub mean_radius: f32,
pub satellites: Option<Vec<Satellite>>,
}
#[derive(Copy, Clone, Eq, PartialEq, Serialize, Deserialize, Debug)]
pub enum PlanetType {
TerrestrialPlanet,
GasGiant,
IceGiant,
DwarfPlanet,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Satellite {
pub name: String,
pub first_spacecraft_landing_date: Option<mongodb::bson::DateTime>,
}
string
, f32
), as well as of:ObjectId
(Planet.id
)Planet.satellites
)Satellite.first_spacecraft_landing_date
)Planet.type
)Planet.id
, Planet.satellites
)get all planets
GET
http://localhost:9000/planets
Example with filtering:
GET
http://localhost:9000/planets?type=IceGiant
create planet
POST
http://localhost:9000/planets
Body:
{
"name": "Pluto",
"type": "DwarfPlanet",
"mean_radius": 1188,
"satellites": null
}
get planet by id
GET
http://localhost:9000/{planet_id}
update planet
PUT
http://localhost:9000/{planet_id}
Body:
{
"name": "Mercury",
"type": "TerrestrialPlanet",
"mean_radius": 2439.7,
"satellites": null
}
delete planet
DELETE
http://localhost:9000/{planet_id}
get image of a planet
GET
http://localhost:9000/planets/{planet_id}/image
Use it to test caching with Redis
pub async fn create_client(redis_uri: String) -> Result<Client, RedisError> {
Ok(Client::open(redis_uri)?)
}
let redis_client = redis::create_client(redis_uri)
.await
.expect("Can't create Redis client");
let redis_connection_manager = redis_client
.get_tokio_connection_manager()
.await
.expect("Can't create Redis connection manager");
pub async fn get_planet(&self, planet_id: &str) -> Result<Planet, CustomError> {
let cache_key = self.get_planet_cache_key(planet_id);
let mut con = self.redis_client.get_async_connection().await?;
let cached_planet = con.get(&cache_key).await?;
match cached_planet {
Value::Nil => {
debug!("Use database to retrieve a planet by id: {}", &planet_id);
let result: Planet = self
.mongodb_client
.get_planet(ObjectId::from_str(planet_id)?)
.await?;
let _: () = redis::pipe()
.atomic()
.set(&cache_key, &result)
.expire(&cache_key, 60)
.query_async(&mut con)
.await?;
Ok(result)
}
Value::Data(val) => {
debug!("Use cache to retrieve a planet by id: {}", planet_id);
Ok(serde_json::from_slice(&val)?)
}
_ => Err(RedisError {
message: "Unexpected response from Redis".to_string(),
}),
}
}
Nil
arm), key-value pair is put to Redis using set
function (with auto expiration (its purpose will be described later)); in the second arm of match
expression cached value is converted to the target structure.ToRedisArgs
trait for a structure:impl ToRedisArgs for &Planet {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
out.write_arg_fmt(serde_json::to_string(self).expect("Can't serialize Planet as string"))
}
}
get_planet
function, Redis async connection is used. The following code snippet demonstrates another approach, ConnectionManager
, on the example of clearing cache using del
function:pub async fn update_planet(
&self,
planet_id: &str,
planet: Planet,
) -> Result<Planet, CustomError> {
let updated_planet = self
.mongodb_client
.update_planet(ObjectId::from_str(planet_id)?, planet)
.await?;
let cache_key = self.get_planet_cache_key(planet_id);
self.redis_connection_manager.clone().del(cache_key).await?;
Ok(updated_planet)
}
del
function won't be called. That can lead to incorrect data in cache; consequences of that can be reduced by auto expiration of cached entries described earlier.ConnectionManager
can be cloned. It is also used in all the remaining Redis examples instead of Redis client.set
/get
functions):pub async fn get_image_of_planet(&self, planet_id: &str) -> Result<Vec<u8>, CustomError> {
let cache_key = self.get_image_cache_key(planet_id);
let mut redis_connection_manager = self.redis_connection_manager.clone();
let cached_image = redis_connection_manager.get(&cache_key).await?;
match cached_image {
Value::Nil => {
debug!(
"Use database to retrieve an image of a planet by id: {}",
&planet_id
);
let planet = self
.mongodb_client
.get_planet(ObjectId::from_str(planet_id)?)
.await?;
let result = crate::db::get_image_of_planet(&planet.name).await;
let _: () = redis::pipe()
.atomic()
.set(&cache_key, result.clone())
.expire(&cache_key, 60)
.query_async(&mut redis_connection_manager)
.await?;
Ok(result)
}
Value::Data(val) => {
debug!(
"Use cache to retrieve an image of a planet by id: {}",
&planet_id
);
Ok(val)
}
_ => Err(RedisError {
message: "Unexpected response from Redis".to_string(),
}),
}
}
#[derive(Clone)]
pub struct RateLimitingService {
redis_connection_manager: ConnectionManager,
}
impl RateLimitingService {
pub fn new(redis_connection_manager: ConnectionManager) -> Self {
RateLimitingService {
redis_connection_manager,
}
}
pub async fn assert_rate_limit_not_exceeded(&self, ip_addr: String) -> Result<(), CustomError> {
let current_minute = Utc::now().minute();
let rate_limit_key = format!("{}:{}:{}", RATE_LIMIT_KEY_PREFIX, ip_addr, current_minute);
let (count,): (u64,) = redis::pipe()
.atomic()
.incr(&rate_limit_key, 1)
.expire(&rate_limit_key, 60)
.ignore()
.query_async(&mut self.redis_connection_manager.clone())
.await?;
if count > MAX_REQUESTS_PER_MINUTE {
Err(TooManyRequests {
actual_count: count,
permitted_count: MAX_REQUESTS_PER_MINUTE,
})
} else {
Ok(())
}
}
}
assert_rate_limit_not_exceeded
function, the value is incremented by 1. To make sure we don’t fill up our entire database with junk, the key is expired after one minute.pub async fn get_planets(
req: HttpRequest,
web::Query(query_params): web::Query<GetPlanetsQueryParams>,
rate_limit_service: web::Data<Arc<RateLimitingService>>,
planet_service: web::Data<Arc<PlanetService>>,
) -> Result<HttpResponse, CustomError> {
rate_limit_service
.assert_rate_limit_not_exceeded(get_ip_addr(&req)?)
.await?;
let planets = planet_service.get_planets(query_params.r#type).await?;
Ok(HttpResponse::Ok().json(planets.into_iter().map(PlanetDto::from).collect::<Vec<_>>()))
}
pub async fn create_planet(&self, planet: Planet) -> Result<Planet, CustomError> {
let planet = self.mongodb_client.create_planet(planet).await?;
self.redis_connection_manager
.clone()
.publish(
NEW_PLANETS_CHANNEL_NAME,
serde_json::to_string(&PlanetMessage::from(&planet))?,
)
.await?;
Ok(planet)
}
pub async fn get_new_planets_stream(
&self,
) -> Result<Receiver<Result<Bytes, CustomError>>, CustomError> {
let (tx, rx) = mpsc::channel::<Result<Bytes, CustomError>>(100);
tx.send(Ok(Bytes::from("data: Connected\n\n")))
.await
.expect("Can't send a message to the stream");
let mut pubsub_con = self
.redis_client
.get_async_connection()
.await?
.into_pubsub();
pubsub_con.subscribe(NEW_PLANETS_CHANNEL_NAME).await?;
tokio::spawn(async move {
while let Some(msg) = pubsub_con.on_message().next().await {
let payload = msg.get_payload().expect("Can't get payload of message");
let payload: String = FromRedisValue::from_redis_value(&payload)
.expect("Can't convert from Redis value");
let msg = Bytes::from(format!("data: Planet created: {:?}\n\n", payload));
tx.send(Ok(msg))
.await
.expect("Can't send a message to the stream");
}
});
Ok(rx)
}
pub async fn sse(
planet_service: web::Data<Arc<PlanetService>>,
) -> Result<HttpResponse, CustomError> {
let new_planets_stream = planet_service.get_new_planets_stream().await?;
let response_stream = tokio_stream::wrappers::ReceiverStream::new(new_planets_stream);
Ok(HttpResponse::build(StatusCode::OK)
.insert_header(header::ContentType(mime::TEXT_EVENT_STREAM))
.streaming(response_stream))
}
subscription from a browser
Navigate to http://localhost:9000/
where HTML page is accessible:
subscription from a command line using cURL
Use curl -X GET localhost:9000/events
:
curl -X POST -H 'Content-Type: application/json' -d '{
\"name\": \"Pluto\",
\"type\": \"DwarfPlanet\",
\"mean_radius\": 1188,
\"satellites\": null
}' localhost:9000/planets
#[actix_web::main]
async fn main() -> std::io::Result<()> {
...
let enable_write_handlers = env::var("ENABLE_WRITE_HANDLERS")
.expect("ENABLE_WRITE_HANDLERS env var should be specified")
.parse::<bool>()
.expect("Can't parse ENABLE_WRITE_HANDLERS");
HttpServer::new(move || {
let mut app = App::new()
.route("/planets", web::get().to(handlers::get_planets))
.route("/planets/{planet_id}", web::get().to(handlers::get_planet))
.route(
"/planets/{planet_id}/image",
web::get().to(handlers::get_image_of_planet),
)
.route("/events", web::get().to(handlers::sse))
.route("/", web::get().to(handlers::index))
.data(Arc::clone(&planet_service))
.data(Arc::clone(&rate_limiting_service));
if enable_write_handlers {
app = app
.route("/planets", web::post().to(handlers::create_planet))
.route(
"/planets/{planet_id}",
web::put().to(handlers::update_planet),
)
.route(
"/planets/{planet_id}",
web::delete().to(handlers::delete_planet),
);
}
app
})
.bind("0.0.0.0:9000")?
.run()
.await
}
using Docker Compose (docker-compose.yml
):
docker compose up
(or docker-compose up
in older Docker versions)
without Docker
Start the application using cargo run
(in this case, mongodb-redis
service should be disabled in the docker-compose.yml
)
GET
endpoints, for example:GET http://demo.romankudryashov.com:9000/planets