PLPython 常见问题

本文档介绍 PLPython 相关的常见问题。

1 PLPython 和 Python 连接数据库操作的区别与优劣是什么?


直接使用 Python + 连接库操作数据库,行为是单机的; PLPython 能够下沉计算到数据库,利用分布式的计算能力。

2 如何在数据库中使用 PLPython


只需在 YMatrix 中创建如下扩展既可在数据库中使用 PLPython。

create extension plpython3u

3 PLPython中如果需要第三方的库怎么安装?


直接使用 pip3 在所有 YMatrix 服务器安装即可。

4 Python 访问 YMatrix 数据库示例


源代码

# -*- coding: utf-8 -*-
import psycopg2

class MatrixDB(object):
    def __init__(self):
        self.host = "69.230.237.118"
        self.user = "shidb"
        self.database = "test"
        self.port = "5432"
        self.password = "sfaLxxXwkh"

    def get_conn(self):
        conn = psycopg2.connect(database=self.database,
                                user=self.user,
                                password=self.password,
                                host=self.host,
                                port=self.port)
        return conn

    def create_table(self):
        conn = self.get_conn()
        cursor = conn.cursor()
        sql = "CREATE TABLE data(" \
                "time timestamp," \
                "tag_id int," \
                "metrics1 float8," \
                "metrics2 float8," \
                "metrics3 float8" \
                ")Distributed by(tag_id)"
        cursor.execute(sql)
        conn.commit()
        conn.close()

    def insert(self):
        conn = self.get_conn()
        cursor = conn.cursor()
        sql = "INSERT INTO data VALUES(now(), 1, 1.1, 1.2, 1.3)"
        cursor.execute(sql)
        conn.commit()
        conn.close()

    def select(self):
        conn = self.get_conn()
        cursor = conn.cursor()
        sql = "SELECT * FROM data"
        cursor.execute(sql)
        data = cursor.fetchone()
        conn.commit()
        conn.close()
        return data

if __name__ == '__main__':
    mxdb = MatrixDB()
    mxdb.create_table()
    mxdb.insert()
    print (mxdb.select())

5 Python 批量插入数据示例


源代码

# -*- coding: utf-8 -*-
import psycopg2
import random
import time
from io import StringIO

class MatrixDB(object):
    def __init__(self):
        self.host = "69.230.237.118"
        self.user = "shidb"
        self.database = "test"
        self.port = "5432"
        self.password = "sfaLxxXwkh"

    def get_conn(self):
        conn = psycopg2.connect(database=self.database,
                                user=self.user,
                                password=self.password,
                                host=self.host,
                                port=self.port)
        return conn

    def create_table(self):
        conn = self.get_conn()
        cursor = conn.cursor()
        sql = "CREATE TABLE data(" \
                "time timestamp," \
                "tag_id int," \
                "metrics1 float8," \
                "metrics2 float8," \
                "metrics3 float8" \
                ")Distributed by(tag_id)"
        cursor.execute(sql)
        conn.commit()
        conn.close()

    def insert(self,num):
        start_time = time.time()
        print("start_time:", start_time)
        conn = self.get_conn()
        cursor = conn.cursor()
        insert_head = "insert into data values "
        insert_data = "(now(),'{}','{}','{}','{}')"

        a = []
        for i in range(num):
            tag_id = round(random.random() * 100)
            metrics1 = round(random.random(), 2)
            metrics2 = round(random.random(), 2)
            metrics3 = round(random.random(), 2)
            print(i,"random:", tag_id, metrics1, metrics2, metrics3)
            insert_data = ('now()',tag_id, metrics1, metrics2, metrics3)
            a.append(insert_data)
        data_str=str(a).strip('[').strip(']') + ';'
        print("insert data str:","insert into data values "+data_str)
        print("batch insert data")

        sql="insert into data values "+str(a).strip('[').strip(']')+';'
        cursor.execute(sql)
        conn.commit()
        conn.close()
        end_time = time.time()
        print("end_time:", end_time,end_time-start_time)

    def copybatch(self,num):
        start_time = time.time()
        print("start_time:", start_time)
        conn = self.get_conn()
        cursor = conn.cursor()
        f = StringIO()
        insert_data=""
        for i in range(num):
            tag_id = round(random.random() * 100)
            metrics1 = round(random.random(), 2)
            metrics2 = round(random.random(), 2)
            metrics3 = round(random.random(), 2)
            print(i, "random:", tag_id, metrics1, metrics2, metrics3)
            insert_data += str(tag_id)+"\t"+str(metrics1)+"\t"+str(metrics2)+"\t"+str(metrics3)+"\n"

        print("insert_data",insert_data)
        f.write(insert_data)

        f.seek(0)
        cursor.copy_from(f, "data",
                         columns=("tag_id", "metrics1", "metrics2","metrics3"),
                         sep='\t', null='\\N', size=16384)  # 默认sep和null 都是none

        conn.commit()
        end_time = time.time()
        print("end_time:", end_time, end_time - start_time)
    def select(self):
        conn = self.get_conn()
        cursor = conn.cursor()
        sql = "SELECT count(*) FROM data"
        cursor.execute(sql)
        data = cursor.fetchone()
        conn.commit()
        conn.close()
        return data

if __name__ == '__main__':
    mxdb = MatrixDB()
    #mxdb.create_table()
    #mxdb.insert(3)
    #print (mxdb.select())
    mxdb.copybatch(3)
    print(mxdb.select())

6 PLPython 中使用 Python 代码


PLPython 中使用 Python 代码,方式与 Python 中调用其它 workspace 下的 Python 程序一样,只需在 sys.append 中添加上对应路径即可。

示例

/home/mxadmin/python/demo.py 写入如下代码:

#自定义函数
def print_and_return(anything):
    print("print_and_return: {}".format(anything))
    return anything

#测试一下效果
print_and_return("hello_world")
print_and_return(["hello", "world"])

在 PLPython 中 import 该函数:

create or replace function call_my_function(my_name text, my_time timestamptz, my_bat_voltage numeric)
returns text
as $$

import sys
# 把我们已经写好的python代码,放到当前执行python的环境变量里
sys.path.append('/home/mxadmin/python/')

from demo import print_and_return

# 调用我们刚刚创建的函数
result = print_and_return("my name is: {} time: {} bat: {}".format(my_name, my_time, my_bat_voltage))
# 结果返返回给SQL
return str(my_time)
$$ language plpython3u;

输出如下

mydb=# select call_my_function('a', now(), 1);
       call_my_function        
-------------------------------
 2022-11-17 17:14:29.618774+08
(1 row)

7 PLPython 标准异常处理


源代码

create or replace function call_my_exception()
returns text
as $$
import io
import sys
from contextlib import redirect_stdout
import traceback

def print_exception():
    print("exception will happen")
    empty_list = []
    empty_list[1] += 1

my_print = io.StringIO()
with redirect_stdout(my_print):
    print_exception()

return "to sql: {}".format(my_print.getvalue())
$$ language plpython3u;

输出如下

mydb=# select call_my_exception();
ERROR:  IndexError: list index out of range
CONTEXT:  Traceback (most recent call last):
  PL/Python function "call_my_exception", line 14, in <module>
    print_exception()
  PL/Python function "call_my_exception", line 10, in print_exception
    empty_list[1] += 1
PL/Python function "call_my_exception"

8 使用 traceback 处理 PLPython 异常


源代码

CREATE OR REPLACE FUNCTION public.call_my_tb_exception()
 RETURNS text
 LANGUAGE plpython3u
AS $function$
import io
import sys
from contextlib import redirect_stdout
import traceback

def print_traceback_exception():
    print("exception will happen")
    empty_list = []
    try:
        empty_list[1] += 1
    except Exception:
        print(traceback.format_exc())

my_print = io.StringIO()
with redirect_stdout(my_print):
    print_traceback_exception()

# plpy.info(my_print.getvalue())

return "to sql: {}".format(my_print.getvalue())
$function$;

输出如下

mydb=# select call_my_tb_exception();
                   call_my_tb_exception                   
----------------------------------------------------------
 to sql: exception will happen                           +
 Traceback (most recent call last):                      +
   File "<string>", line 12, in print_traceback_exception+
 IndexError: list index out of range                     +
                                                         +

(1 row)