30
loading...
This website collects cookies to deliver better user experience
CSV
's that will be used are created with the create_csvs.py
script, using the projects_and_sales.json
file. When we generate the accounts.csv
, projects.csv
,tokens.csv
and sales.csv
files, we are ready to populate our graph database.CSV
files we generated.:Account
, :Project
, :Contract
, and :Token
loaded, along with the relationships between them. Data will be loaded with the LOAD CSV
clause, and you can see how that is implemented in the art-blocks-memgraph.py
script in the repository. There we have also created and started a stream, which will consume sales. Then we'll be able to create :Sale
nodes.JSON
files, and we want to create new nodes and relationships from received data. Our transformation module looks like this:import mgp
import json
@mgp.transformation
def sales(messages: mgp.Messages
) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]):
result_queries = []
for i in range(messages.total_messages()):
message = messages.message_at(i)
sale_info = json.loads(message.payload().decode('utf8'))
result_queries.append(
mgp.Record(
query=(
"CREATE (s:Sale {sale_id: $sale_id, payment_token: $payment_token, price: $price, datetime: $datetime})"
"MERGE (p:Project {project_id: $project_id})"
"CREATE (p)-[:HAS]->(s)"
"MERGE (a:Account {account_id: $seller_id})"
"CREATE (a)-[:IS_SELLING]->(s)"
"MERGE (b:Account {account_id: $buyer_id})"
"CREATE (b)-[:IS_BUYING]->(s)"
"MERGE (t:Token {token_id: $token_id})"
"CREATE (t)-[:IS_SOLD_IN]->(s)"),
parameters={
"project_id": sale_info["project_id"],
"seller_id": sale_info["seller_id"],
"buyer_id": sale_info["buyer_id"],
"token_id": sale_info["token_id"],
"sale_id": sale_info["sale_id"],
"payment_token": sale_info["payment_token"],
"price": sale_info["price"],
"datetime": sale_info["datetime"]
}))
return result_queries
{
"project_id": "0xa7d8d9ef8d8ce8992df33d8b8cf4aebabd5bd270-10",
"sale_id": "0x84b60fca478019f86cc45080850d4b7c19e47d446d344dbb93d7700ed8b909c9",
"token_id": "0xa7d8d9ef8d8ce8992df33d8b8cf4aebabd5bd270-10000005",
"seller_id": "0x9260ae742f44b7a2e9472f5c299aa0432b3502fa",
"buyer_id": "0x720a4fab08cb746fc90e88d1924a98104c0822cf",
"payment_token": "0x0000000000000000000000000000000000000000",
"price": "690000000000000000",
"block_number": "11675452",
"datetime": "2021-01-17 23:29:47"
}
:Sale
that will have sale_id
, payment_token
, price
and datetime
properties. Each message has information about the project and account that has sold and bought tokens in that sale. From that information, we create nodes labeled with :Project
, :Account
, :Token
. We use the MERGE
clause since we don't want to duplicate nodes but connect new sales with existing projects, accounts, or tokens. Now you can notice that our transformation module is quite simple and intuitive because we're writing down all the queries we would make if we want to load one sale into our database.query_modules
folder in the docker container where you ran Memgraph. First, check the container_id
with:docker ps
container_id
, run:docker cp <PATH_TO_LOCAL_TRANFORMATION_MODULE> <container_id>:/usr/lib/memgraph/query_modules/<transformation.py>
docker exec -it <container_id> bash
cd /usr/lib/memgraph/query_modules
sales.csv
file. Each message is a row from that CSV
file, converted into a dictionary, sent as JSON
.def producer(ip, port, topic, generate, stream_delay):
client = pulsar.Client('pulsar://' + ip + ':' + port)
producer = client.create_producer(topic)
message = generate()
while True:
try:
producer.send(json.dumps(next(message)).encode('utf8'))
sleep(stream_delay)
except Exception as e:
print(f"Error: {e}")
def consumer(ip, port, topic, platform):
client = pulsar.Client('pulsar://' + ip + ':' + port)
consumer = client.subscribe(topic, 'my-subscription')
while True:
msg = consumer.receive()
try:
print(platform, ": ", msg.data())
consumer.acknowledge(msg)
except:
consumer.negative_acknowledge(msg)
client.close()
python3 start.py --platforms pulsar --dataset art-blocks
, you'll see messages being consumed, which means we have created the producer correctly. CREATE PULSAR STREAM
clause. You can see that everything is set up in the art-blocks-memgraph service. The query for creating a Pulsar stream is:CREATE PULSAR STREAM sales_stream
TOPICS sales
TRANSFORM artblocks.sales
SERVICE_URL 'pulsar://pulsar:6650;'
sales_stream
that consumes messages from topic sales
and transforms those messages into queries with artblocks.sales
transformation module. SERVICE_URL
is the URL
to the running Pulsar cluster. Now we're going to start that stream with:START STREAM sales_stream;
SHOW STREAMS;
art-blocks-memgraph
service:docker-compose up art-blocks-memgraph
:Sale
nodes are being created, and you can verify that by running a simple query and checking the results.MATCH (a:Account)-[r:IS_SELLING]->(s:Sale)
WITH a, count(s) AS num_of_sales
WHERE num_of_sales > 1
RETURN a.account_id, num_of_sales
ORDER BY num_of_sales DESC
LIMIT 10;
:IS_SELLING
relationship with IS_BUYING
and you'll get your results.MATCH (n:Sale)
WITH n.datetime STARTS WITH "2021-01-30" as value
RETURN count(CASE WHEN value = true THEN 1 END) AS number_of_sales;
MATCH (n:Sale)
WITH collect(n.datetime) as dates
UNWIND dates AS date
WITH substring(date, 0, 10) AS day
WITH collect(day) AS days
UNWIND days as one_day
RETURN DISTINCT one_day;
MATCH (n:Sale)
WITH toInteger(n.price) AS price, n.sale_id AS sale
RETURN sale, price
ORDER BY price DESC
LIMIT 10;