38
loading...
This website collects cookies to deliver better user experience
The code is available in this GitHub repo - https://github.com/abhirockzz/redis-streams-in-action
XPENDING
), claims (XCLAIM
), processes (store them as HASH
using HSET
) and finally acknowledges them (XACK
).REDIS_HOST
- host and port for Redis instance e.g. myredis:10000REDIS_PASSWORD
- access key (password) for Redis instanceSTREAM_NAME
- the name of the Redis Stream (use tweets_stream
as the value)STREAM_CONSUMER_GROUP_NAME
- name of the Redis Streams consumer group (use redisearch_app_group
as the value)MONITORING_CONSUMER_NAME
- name of the consumer instance represented by the monitoring app (it is part of the aforementioned consumer group)MIN_IDLE_TIME_SEC
- only pending messages that are older than the specified time interval will be claimedgit clone https://github.com/abhirockzz/redis-streams-in-action
cd redis-streams-in-action/monitoring-app
GOOS=linux go build -o processor_monitor cmd/main.go
GOOS=linux
is used to build a Linux
executable since we chose a Linux
OS for our Function App
func azure functionapp publish <enter name of the Azure Function app>
Getting site publishing info...
Uploading package...
Uploading 3.71 MB [###############################################################################]
Upload completed successfully.
Deployment completed successfully.
Syncing triggers...
Functions in streams-monitor:
monitor - [timerTrigger]
{
"bindings": [
{
"type": "timerTrigger",
"direction": "in",
"name": "req",
"schedule": "*/20 * * * * *"
}
]
}
XPENDING tweets_stream redisearch_app_group
1) (integer) 209
2) "1620973121009-0"
3) "1621054539960-0"
4) 1) 1) "consumer-1f20d41d-e63e-40d2-bc0f-749f11f15026"
2) "3"
2) 1) "monitoring_app"
2) "206"
monitoring_app
(name of our consumer) is 206
- it actually claimed these from another consumer instance(s). Once these messages have been claimed, their ownership moves from their original consumer to the monitoring_app
consumer.You can check the same using XPENDING tweets_stream redisearch_app_group
again, but it might be hard to detect since the messages actually get processed pretty quickly.
206
messages that were claimed, only the ones that have not being processed in the last 10
seconds (this is the MIN_IDLE_TIME_SEC
we had specified) will be processed - others will be ignored and picked up in the next run by XPENDING
call (if they are still in an unprocessed state). This is because we want to give some time for our consumer application to finish their work - 10 seconds is a pretty generous time-frame for the processing that involves saving to HASH
using HSET
followed by XACK
. . Please note that the 10 second time interval used above has been used as example and you should determine these figures based on the end to end latencies required for your data pipelines/processing.
RediSearch
queries to validate that you can search for tweets based on multiple criteria:FT.SEARCH tweets-index hello
FT.SEARCH tweets-index hello|world
FT.SEARCH tweets-index "@location:India"
FT.SEARCH tweets-index "@user:jo* @location:India"
FT.SEARCH tweets-index "@user:jo* | @location:India"
FT.SEARCH tweets-index "@hashtags:{cov*}"
FT.SEARCH tweets-index "@hashtags:{cov*|Med*}"
Please refer to the code on GitHub
TLS
):client := redis.NewClient(&redis.Options{Addr: host, Password: password, TLSConfig: &tls.Config{MinVersion: tls.VersionTLS12}})
err = client.Ping(context.Background()).Err()
if err != nil {
log.Fatal(err)
}
XPENDING
to detect no. of pending messages e.g. XPENDING tweets_stream group1
numPendingMessages := client.XPending(context.Background(), streamName, consumerGroupName).Val().Count
XPENDING
, to which we pass on the no. of messages we obtained in previous callxpendingResult := client.XPendingExt(context.Background(), &redis.XPendingExtArgs{Stream: streamName,Group: consumerGroupName, Start: "-", End: "+", Count: numPendingMessages})
monitoringConsumerName
) whose name we specifiedxclaim := client.XClaim(context.Background(), &redis.XClaimArgs{Stream: streamName, Group: consumerGroupName, Consumer: monitoringConsumerName, MinIdle: time.Duration(minIdleTimeSec) * time.Second, Messages: toBeClaimed})
HASH
(using HSET
) and acknowledging successful processing (XACK
). goroutine
s are used to keep things efficient for e.g. if we get 100
claimed messages in a batch, a scatter-gather process is folloeed where a goroutine
is spawned to process each of these message. A sync.WaitGroup is used to "wait" for the current batch to complete before looking for next set of pending messages (if any).for _, claimed := range xclaim.Val() {
if exitSignalled {
return
}
waitGroup.Add(1)
go func(tweetFromStream redis.XMessage) {
hashName := fmt.Sprintf("%s%s", indexDefinitionHashPrefix, tweetFromStream.Values["id"])
processed := false
defer func() {
waitGroup.Done()
}()
err = client.HSet(context.Background(), hashName, claimed.Values).Err()
if err != nil {
return // don't proceed (ACK) if HSET fails
}
err = client.XAck(context.Background(), streamName, consumerGroupName, tweetFromStream.ID).Err()
if err != nil {
return
}
processed = true
}(claimed)
}
waitGroup.Wait()
.
├── cmd
│ └── main.go
├── monitor
│ └── function.json
├── go.mod
├── go.sum
├── host.json
host.json
tells the Functions host where to send requests by pointing to a web server capable of processing HTTP events. Notice the customHandler.description.defaultExecutablePath
which defines that processor_monitor
is the name of the executable that'll be used to run the web server.{
"version": "2.0",
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[1.*, 2.0.0)"
},
"customHandler": {
"description": {
"defaultExecutablePath": "processor_monitor"
},
"enableForwardingHttpRequest": true
},
"logging": {
"logLevel": {
"default": "Trace"
}
}
}
RediSearch
. It setup the scene for rest of the series.