22
loading...
This website collects cookies to deliver better user experience
dataclasses_json
and will use the psychopg2
driver which is the most frequently used with Python. This requires:pip install psycopg2 dataclasses_json
generate_events
code from the MaibornWolff project:from datetime import datetime
import random
import string
from dataclasses import dataclass
from dataclasses_json import dataclass_json
from datetime import datetime, timezone
@dataclass_json
@dataclass
class Event:
timestamp: datetime
device_id: str
sequence_number: str
temperature: float
def _rand_string(length):
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))
def generate_events(device_id, start_timestamp, num_events, sequence_number=1, device_spread=1):
device_ids = [f"{_rand_string(4)}{device_id}{_rand_string(4)}" for i in range(device_spread)]
if start_timestamp == 0:
start_timestamp = int(datetime.now().timestamp()*1000)
for i in range(num_events):
event = Event(
datetime.fromtimestamp(start_timestamp/1000, timezone.utc)
, device_ids[i%device_spread], sequence_number, random.uniform(-20, 35))
yield event
sequence_number += 1
start_timestamp += random.randint(5, 10)*60
iot_demo
table to store those events:import psycopg2
yb = psycopg2.connect('postgres://yugabyte:[email protected]:5433/yugabyte')
yb.cursor().execute("""
drop table if exists iot_demo;
create table if not exists iot_demo (
timestamp timestamp,
device_id text,
sequence_number bigint,
temperature real,
primary key(device_id,timestamp,sequence_number)
);
""")
generate_events
is a generator (there's a yield
clause to return the event in the loop) I just loop on it to build a string version of the events (tab-separated columns)import io
import psycopg
def load_events_with_copy_psycopg2(device_id,num_events):
yb=psycopg2.connect('postgres://yugabyte:[email protected]:5433/yugabyte')
ysql=yb.cursor()
ysql.execute("set yb_default_copy_from_rows_per_transaction=1000")
tsv=io.StringIO()
for event in generate_events(
device_id=device_id, start_timestamp=0
, num_events=num_events, sequence_number=1
, device_spread=1):
tsv.writelines(f'{event.timestamp}\t{event.device_id}\t{event.sequence_number}\t{event.temperature}\n')
tsv.seek(0)
ysql.copy_from(tsv,'iot_demo',sep="\t",columns=('timestamp', 'device_id', 'sequence_number', 'temperature'))
tsv.seek(0)
tsv.truncate(0)
yb.commit()
ysql.close()
copy_from()
reads from a file, but here I generate events into an in-memory StringIO that will be read like a file by copy_from()
. Don't forget to seek(0)
after writing to it so that copy_from()
starts at the begining. And truncate(0)
when done.copy_expert
which allows for more options:import pandas
import psycopg2
def load_events_with_copy_from_psycopg2(device_id,num_events):
yb=psycopg2.connect('postgres://yugabyte:[email protected]:5433/yugabyte')
ysql=yb.cursor()
ysql.execute("set yb_default_copy_from_rows_per_transaction=1000")
events=[]
for event in generate_events(
device_id=device_id, start_timestamp=0
, num_events=num_events, sequence_number=1
, device_spread=1):
events.append(event)
csv=io.StringIO(pandas.DataFrame(events).to_csv(header=True,index=False))
ysql.copy_expert("""
COPY iot_demo(timestamp,device_id,sequence_number,temperature)
FROM STDIN WITH DELIMITER ',' CSV HEADER
""",csv)
yb.commit()
ysql.close()
import threading
def loop_in_thread(device_id,num_loops,num_events):
t1=datetime.now().timestamp()
for l in range(num_loops):
load_events_with_copy_psycopg2(device_id,num_events)
t2=datetime.now().timestamp()
print(f'{device_id:2d}: {(num_events)/(t2-t1):8.2f} rows/seconds/thread')
threads=[]
for i in range(2):
t=threading.Thread(target=loop_in_thread,args=[i,10,10000])
threads.append(t)
t.start()
for i in threads:
t.join()
[opc@C tmp]$ python3 copy_from.py
0: 724.47 rows/seconds/thread
1: 709.02 rows/seconds/thread
2: 687.75 rows/seconds/thread
yb_default_copy_from_rows_per_transaction
has been introduced with a default value of 1000 to avoid memory issues when loading large files on small servers. Setting this to 0 reverts to the same as PostgreSQL in case you want to be sure that all is rolled back in case of failure.