PLPython 常见问题
本文档介绍 PLPython 相关的常见问题。
1 PLPython 和 Python 连接数据库操作的区别与优劣是什么?
直接使用 Python + 连接库操作数据库,行为是单机的;PLPython 能够下沉计算到数据库,利用分布式的计算能力。
2 如何在数据库中使用 PLPython
只需在 YMatrix 中创建如下扩展既可在数据库中使用 PLPython。
=# CREATE EXTENSION plpython3u;
3 PLPython 中如果需要第三方的库怎么安装?
直接使用 pip3 在所有 YMatrix 服务器安装即可。
4 Python 访问 YMatrix 数据库示例
源代码:
# -*- coding: utf-8 -*-
import psycopg2
class MatrixDB(object):
def __init__(self):
self.host = "69.230.237.118"
self.user = "shidb"
self.database = "test"
self.port = "5432"
self.password = "sfaLxxXwkh"
def get_conn(self):
conn = psycopg2.connect(database=self.database,
user=self.user,
password=self.password,
host=self.host,
port=self.port)
return conn
def create_table(self):
conn = self.get_conn()
cursor = conn.cursor()
sql = "CREATE TABLE data(" \
"time timestamp," \
"tag_id int," \
"metrics1 float8," \
"metrics2 float8," \
"metrics3 float8" \
")Distributed by(tag_id)"
cursor.execute(sql)
conn.commit()
conn.close()
def insert(self):
conn = self.get_conn()
cursor = conn.cursor()
sql = "INSERT INTO data VALUES(now(), 1, 1.1, 1.2, 1.3)"
cursor.execute(sql)
conn.commit()
conn.close()
def select(self):
conn = self.get_conn()
cursor = conn.cursor()
sql = "SELECT * FROM data"
cursor.execute(sql)
data = cursor.fetchone()
conn.commit()
conn.close()
return data
if __name__ == '__main__':
mxdb = MatrixDB()
mxdb.create_table()
mxdb.insert()
print (mxdb.select())
5 Python 批量插入数据示例
源代码:
# -*- coding: utf-8 -*-
import psycopg2
import random
import time
from io import StringIO
class MatrixDB(object):
def __init__(self):
self.host = "69.230.237.118"
self.user = "shidb"
self.database = "test"
self.port = "5432"
self.password = "sfaLxxXwkh"
def get_conn(self):
conn = psycopg2.connect(database=self.database,
user=self.user,
password=self.password,
host=self.host,
port=self.port)
return conn
def create_table(self):
conn = self.get_conn()
cursor = conn.cursor()
sql = "CREATE TABLE data(" \
"time timestamp," \
"tag_id int," \
"metrics1 float8," \
"metrics2 float8," \
"metrics3 float8" \
")Distributed by(tag_id)"
cursor.execute(sql)
conn.commit()
conn.close()
def insert(self,num):
start_time = time.time()
print("start_time:", start_time)
conn = self.get_conn()
cursor = conn.cursor()
insert_head = "insert into data values "
insert_data = "(now(),'{}','{}','{}','{}')"
a = []
for i in range(num):
tag_id = round(random.random() * 100)
metrics1 = round(random.random(), 2)
metrics2 = round(random.random(), 2)
metrics3 = round(random.random(), 2)
print(i,"random:", tag_id, metrics1, metrics2, metrics3)
insert_data = ('now()',tag_id, metrics1, metrics2, metrics3)
a.append(insert_data)
data_str=str(a).strip('[').strip(']') + ';'
print("insert data str:","insert into data values "+data_str)
print("batch insert data")
sql="insert into data values "+str(a).strip('[').strip(']')+';'
cursor.execute(sql)
conn.commit()
conn.close()
end_time = time.time()
print("end_time:", end_time,end_time-start_time)
def copybatch(self,num):
start_time = time.time()
print("start_time:", start_time)
conn = self.get_conn()
cursor = conn.cursor()
f = StringIO()
insert_data=""
for i in range(num):
tag_id = round(random.random() * 100)
metrics1 = round(random.random(), 2)
metrics2 = round(random.random(), 2)
metrics3 = round(random.random(), 2)
print(i, "random:", tag_id, metrics1, metrics2, metrics3)
insert_data += str(tag_id)+"\t"+str(metrics1)+"\t"+str(metrics2)+"\t"+str(metrics3)+"\n"
print("insert_data",insert_data)
f.write(insert_data)
f.seek(0)
cursor.copy_from(f, "data",
columns=("tag_id", "metrics1", "metrics2","metrics3"),
sep='\t', null='\\N', size=16384) # 默认sep和null 都是none
conn.commit()
end_time = time.time()
print("end_time:", end_time, end_time - start_time)
def select(self):
conn = self.get_conn()
cursor = conn.cursor()
sql = "SELECT count(*) FROM data"
cursor.execute(sql)
data = cursor.fetchone()
conn.commit()
conn.close()
return data
if __name__ == '__main__':
mxdb = MatrixDB()
#mxdb.create_table()
#mxdb.insert(3)
#print (mxdb.select())
mxdb.copybatch(3)
print(mxdb.select())
6 PLPython 中使用 Python 代码
PLPython 中使用 Python 代码,方式与 Python 中调用其它 workspace 下的 Python 程序一样,只需在 sys.append 中添加上对应路径即可。
示例
在 /home/mxadmin/python/demo.py
写入如下代码:
#自定义函数
def print_and_return(anything):
print("print_and_return: {}".format(anything))
return anything
#测试一下效果
print_and_return("hello_world")
print_and_return(["hello", "world"])
在 PLPython 中 import 该函数:
create or replace function call_my_function(my_name text, my_time timestamptz, my_bat_voltage numeric)
returns text
as $$
import sys
# 把我们已经写好的 Python 代码,放到当前执行 Python 的环境变量里
sys.path.append('/home/mxadmin/python/')
from demo import print_and_return
# 调用我们刚刚创建的函数
result = print_and_return("my name is: {} time: {} bat: {}".format(my_name, my_time, my_bat_voltage))
# 结果返回给 SQL
return str(my_time)
$$ language plpython3u;
输出如下:
mydb=# SELECT call_my_function('a', now(), 1);
call_my_function
-------------------------------
2022-11-17 17:14:29.618774+08
(1 row)
7 PLPython 标准异常处理
源代码:
create or replace function call_my_exception()
returns text
as $$
import io
import sys
from contextlib import redirect_stdout
import traceback
def print_exception():
print("exception will happen")
empty_list = []
empty_list[1] += 1
my_print = io.StringIO()
with redirect_stdout(my_print):
print_exception()
return "to sql: {}".format(my_print.getvalue())
$$ language plpython3u;
输出如下
mydb=# SELECT call_my_exception();
ERROR: IndexError: list index out of range
CONTEXT: Traceback (most recent call last):
PL/Python function "call_my_exception", line 14, in <module>
print_exception()
PL/Python function "call_my_exception", line 10, in print_exception
empty_list[1] += 1
PL/Python function "call_my_exception"
8 使用 traceback 处理 PLPython 异常
源代码:
CREATE OR REPLACE FUNCTION public.call_my_tb_exception()
RETURNS text
LANGUAGE plpython3u
AS $function$
import io
import sys
from contextlib import redirect_stdout
import traceback
def print_traceback_exception():
print("exception will happen")
empty_list = []
try:
empty_list[1] += 1
except Exception:
print(traceback.format_exc())
my_print = io.StringIO()
with redirect_stdout(my_print):
print_traceback_exception()
# plpy.info(my_print.getvalue())
return "to sql: {}".format(my_print.getvalue())
$function$;
输出如下
mydb=# SELECT call_my_tb_exception();
call_my_tb_exception
----------------------------------------------------------
to sql: exception will happen +
Traceback (most recent call last): +
File "<string>", line 12, in print_traceback_exception+
IndexError: list index out of range +
+
(1 row)