400-800-0824
info@ymatrix.cn
400-800-0824
info@ymatrix.cn
400-800-0824
info@ymatrix.cn
400-800-0824
info@ymatrix.cn
400-800-0824
info@ymatrix.cn
YMatrix 文档
关于 YMatrix
标准集群部署
数据写入
数据迁移
数据查询
运维监控
参考指南
工具指南
数据类型
存储引擎
执行引擎
系统配置参数
SQL 参考
常见问题(FAQ)
新架构 FAQ
集群部署 FAQ
SQL 查询 FAQ
MatrixGate FAQ
运维 FAQ
监控告警 FAQ
PXF FAQ
PLPython FAQ
性能 FAQ
本文档介绍 PLPython 相关的常见问题。
直接使用 Python + 连接库操作数据库,行为是单机的;PLPython 能够下沉计算到数据库,利用分布式的计算能力。
只需在 YMatrix 中创建如下扩展既可在数据库中使用 PLPython。
=# CREATE EXTENSION plpython3u;
直接使用 pip3 在所有 YMatrix 服务器安装即可。
源代码:
# -*- coding: utf-8 -*-
import psycopg2
class MatrixDB(object):
def __init__(self):
self.host = "69.230.237.118"
self.user = "shidb"
self.database = "test"
self.port = "5432"
self.password = "sfaLxxXwkh"
def get_conn(self):
conn = psycopg2.connect(database=self.database,
user=self.user,
password=self.password,
host=self.host,
port=self.port)
return conn
def create_table(self):
conn = self.get_conn()
cursor = conn.cursor()
sql = "CREATE TABLE data(" \
"time timestamp," \
"tag_id int," \
"metrics1 float8," \
"metrics2 float8," \
"metrics3 float8" \
")Distributed by(tag_id)"
cursor.execute(sql)
conn.commit()
conn.close()
def insert(self):
conn = self.get_conn()
cursor = conn.cursor()
sql = "INSERT INTO data VALUES(now(), 1, 1.1, 1.2, 1.3)"
cursor.execute(sql)
conn.commit()
conn.close()
def select(self):
conn = self.get_conn()
cursor = conn.cursor()
sql = "SELECT * FROM data"
cursor.execute(sql)
data = cursor.fetchone()
conn.commit()
conn.close()
return data
if __name__ == '__main__':
mxdb = MatrixDB()
mxdb.create_table()
mxdb.insert()
print (mxdb.select())
源代码:
# -*- coding: utf-8 -*-
import psycopg2
import random
import time
from io import StringIO
class MatrixDB(object):
def __init__(self):
self.host = "69.230.237.118"
self.user = "shidb"
self.database = "test"
self.port = "5432"
self.password = "sfaLxxXwkh"
def get_conn(self):
conn = psycopg2.connect(database=self.database,
user=self.user,
password=self.password,
host=self.host,
port=self.port)
return conn
def create_table(self):
conn = self.get_conn()
cursor = conn.cursor()
sql = "CREATE TABLE data(" \
"time timestamp," \
"tag_id int," \
"metrics1 float8," \
"metrics2 float8," \
"metrics3 float8" \
")Distributed by(tag_id)"
cursor.execute(sql)
conn.commit()
conn.close()
def insert(self,num):
start_time = time.time()
print("start_time:", start_time)
conn = self.get_conn()
cursor = conn.cursor()
insert_head = "insert into data values "
insert_data = "(now(),'{}','{}','{}','{}')"
a = []
for i in range(num):
tag_id = round(random.random() * 100)
metrics1 = round(random.random(), 2)
metrics2 = round(random.random(), 2)
metrics3 = round(random.random(), 2)
print(i,"random:", tag_id, metrics1, metrics2, metrics3)
insert_data = ('now()',tag_id, metrics1, metrics2, metrics3)
a.append(insert_data)
data_str=str(a).strip('[').strip(']') + ';'
print("insert data str:","insert into data values "+data_str)
print("batch insert data")
sql="insert into data values "+str(a).strip('[').strip(']')+';'
cursor.execute(sql)
conn.commit()
conn.close()
end_time = time.time()
print("end_time:", end_time,end_time-start_time)
def copybatch(self,num):
start_time = time.time()
print("start_time:", start_time)
conn = self.get_conn()
cursor = conn.cursor()
f = StringIO()
insert_data=""
for i in range(num):
tag_id = round(random.random() * 100)
metrics1 = round(random.random(), 2)
metrics2 = round(random.random(), 2)
metrics3 = round(random.random(), 2)
print(i, "random:", tag_id, metrics1, metrics2, metrics3)
insert_data += str(tag_id)+"\t"+str(metrics1)+"\t"+str(metrics2)+"\t"+str(metrics3)+"\n"
print("insert_data",insert_data)
f.write(insert_data)
f.seek(0)
cursor.copy_from(f, "data",
columns=("tag_id", "metrics1", "metrics2","metrics3"),
sep='\t', null='\\N', size=16384) # 默认sep和null 都是none
conn.commit()
end_time = time.time()
print("end_time:", end_time, end_time - start_time)
def select(self):
conn = self.get_conn()
cursor = conn.cursor()
sql = "SELECT count(*) FROM data"
cursor.execute(sql)
data = cursor.fetchone()
conn.commit()
conn.close()
return data
if __name__ == '__main__':
mxdb = MatrixDB()
#mxdb.create_table()
#mxdb.insert(3)
#print (mxdb.select())
mxdb.copybatch(3)
print(mxdb.select())
PLPython 中使用 Python 代码,方式与 Python 中调用其它 workspace 下的 Python 程序一样,只需在 sys.append 中添加上对应路径即可。
示例
在 /home/mxadmin/python/demo.py
写入如下代码:
#自定义函数
def print_and_return(anything):
print("print_and_return: {}".format(anything))
return anything
#测试一下效果
print_and_return("hello_world")
print_and_return(["hello", "world"])
在 PLPython 中 import 该函数:
create or replace function call_my_function(my_name text, my_time timestamptz, my_bat_voltage numeric)
returns text
as $$
import sys
# 把我们已经写好的 Python 代码,放到当前执行 Python 的环境变量里
sys.path.append('/home/mxadmin/python/')
from demo import print_and_return
# 调用我们刚刚创建的函数
result = print_and_return("my name is: {} time: {} bat: {}".format(my_name, my_time, my_bat_voltage))
# 结果返回给 SQL
return str(my_time)
$$ language plpython3u;
输出如下:
mydb=# SELECT call_my_function('a', now(), 1);
call_my_function
-------------------------------
2022-11-17 17:14:29.618774+08
(1 row)
源代码:
create or replace function call_my_exception()
returns text
as $$
import io
import sys
from contextlib import redirect_stdout
import traceback
def print_exception():
print("exception will happen")
empty_list = []
empty_list[1] += 1
my_print = io.StringIO()
with redirect_stdout(my_print):
print_exception()
return "to sql: {}".format(my_print.getvalue())
$$ language plpython3u;
输出如下
mydb=# SELECT call_my_exception();
ERROR: IndexError: list index out of range
CONTEXT: Traceback (most recent call last):
PL/Python function "call_my_exception", line 14, in <module>
print_exception()
PL/Python function "call_my_exception", line 10, in print_exception
empty_list[1] += 1
PL/Python function "call_my_exception"
源代码:
CREATE OR REPLACE FUNCTION public.call_my_tb_exception()
RETURNS text
LANGUAGE plpython3u
AS $function$
import io
import sys
from contextlib import redirect_stdout
import traceback
def print_traceback_exception():
print("exception will happen")
empty_list = []
try:
empty_list[1] += 1
except Exception:
print(traceback.format_exc())
my_print = io.StringIO()
with redirect_stdout(my_print):
print_traceback_exception()
# plpy.info(my_print.getvalue())
return "to sql: {}".format(my_print.getvalue())
$function$;
输出如下
mydb=# SELECT call_my_tb_exception();
call_my_tb_exception
----------------------------------------------------------
to sql: exception will happen +
Traceback (most recent call last): +
File "<string>", line 12, in print_traceback_exception+
IndexError: list index out of range +
+
(1 row)