20
loading...
This website collects cookies to deliver better user experience
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/segmentio/kafka-go"
"math/rand"
"os"
"strconv"
"time"
)
func main() {
MIN := 0
MAX := 0
TOTAL := 0
topic := ""
if len(os.Args) > 4 {
MIN, _ = strconv.Atoi(os.Args[1])
MAX, _ = strconv.Atoi(os.Args[2])
TOTAL, _ = strconv.Atoi(os.Args[3])
topic = os.Args[4]
} else {
fmt.Println("Usage:", os.Args[0], "MIX MAX TOTAL TOPIC")
return
}
type Record struct {
Name string `json:"name"`
Random int `json:"random"`
}
func random(min, max int) int {
return rand.Intn(max-min) + min
}
partition := 0
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
fmt.Printf("%s\n", err)
return
}
rand.Seed(time.Now().Unix())
for i := 0; i < TOTAL; i++ {
myrand := random(MIN, MAX)
temp := Record{strconv.Itoa(i), myrand}
recordJSON, _ := json.Marshal(temp)
conn.SetWriteDeadline(time.Now().Add(1 * time.Second))
conn.WriteMessages( kafka.Message{Value: []byte(recordJSON)},)
if i%50 == 0 {
fmt.Print(".")
}
time.Sleep(10 * time.Millisecond)
}
fmt.Println()
conn.Close()
}
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/segmentio/kafka-go"
"os"
)
type Record struct {
Name string `json:"name"`
Random int `json:"random"`
}
func main() {
if len(os.Args) < 2 {
fmt.Println("Need a Kafka topic name.")
return
}
partition := 0
topic := os.Args[1]
fmt.Println("Kafka topic:", topic
r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092"}, Topic: topic, Partition: partition, MinBytes: 10e3, MaxBytes: 10e6, })
r.SetOffset(0)
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("message at offset %d: %s = %s\n", m.Offset,string(m.Key), string(m.Value))
temp := Record{}
err = json.Unmarshal(m.Value, &temp)
if err != nil {
fmt.Println(err)
}
fmt.Printf("%T\n", temp)
}
r.Close()
}