数据库SqlServer迁移PostgreSql实践

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
SqlServer属于商业数据库,不可能像Mysql等数据库一样,去解析相关的数据库binlog,从而实现增量数据的回放,结合应用属性,最后确定采用离线迁移方式,从SqlServer中将表数据全部读出,然后将数据写入到pg中,采用此种方案的弊病就是程序端需停止写入(应用可将部分数据缓存到本地),等待数据库迁移完成后,程序端再迁移至PostGresql,迁移方法如下

背景
公司某内部系统属于商业产品,数据库性能已出现明显问题,服务经常卡死,员工经常反馈数据无法查询或不能及时查询,该系统所使用的数据库为SqlServer,SqlServer数据库属于商业数据库,依赖厂商的维护,且维护成本高,效率低,且存在版权等问题,考虑将该系统的数据库,迁移至PostGresql数据库,属于BSD的开源数据库,不存在版本问题,公司也有部分系统采用pg,维护成本也将大大减低。

迁移原理
SqlServer属于商业数据库,不可能像Mysql等数据库一样,去解析相关的数据库binlog,从而实现增量数据的回放,结合应用属性,最后确定采用离线迁移方式,从SqlServer中将表数据全部读出,然后将数据写入到pg中,采用此种方案的弊病就是程序端需停止写入(应用可将部分数据缓存到本地),等待数据库迁移完成后,程序端再迁移至PostGresql,迁移方法如下: 

表结构迁移原理
表结构主要包含字段,索引,主键,外键等信息组成,主要采用开源工具sqlserver2pg进行表结构的转换

表结构转换
从SqlServer中读写表结构的字段信息,并对字段类型进行转换,转换核心代码如下

sub convert_type
{
    my ($sqlstype, $sqlqual, $colname, $tablename, $typname, $schemaname) =
        @_;
    my $rettype;
    if (defined $types{$sqlstype})
    {
        if ((defined $sqlqual and defined($unqual{$types{$sqlstype}}))
            or not defined $sqlqual)
        {
           # This is one of the few types that have to be unqualified (binary type)
           $rettype = $types{$sqlstype};

           # but we might add a check constraint for binary data
           if ($sqlstype =~ 'binary' and defined $sqlqual) {
              print STDERR "convert_type: $sqlstype, $sqlqual, $colname\n";
              my $constraint;
              $constraint->{TYPE}  = 'CHECK_BINARY_LENGTH';
              $constraint->{TABLE} = $tablename;
              $constraint->{TEXT}  = "octet_length(" . format_identifier($colname) . ") <= $sqlqual";
              push @{$objects->{SCHEMAS}->{$schemaname}->{TABLES}->{$tablename}
                        ->{CONSTRAINTS}}, ($constraint);
           }
        }
        elsif (defined $sqlqual)
        {
            $rettype = ($types{$sqlstype} . "($sqlqual)");
        }
    }

    # A few special cases
    elsif ($sqlstype eq 'bit' and not defined $sqlqual)
    {
        $rettype = "boolean";
    }
    elsif ($sqlstype eq 'ntext' and not defined $sqlqual)
    {
        $rettype = "text";
    }

外键,索引,唯一键转换

主要是从sqlserver导出的表结构数据中,对相关的索引,外键等语句进行转换,转换核心代码如下

while (my ($schema, $refschema) = each %{$objects->{SCHEMAS}})
    {
        # Indexes
        # They don't have a schema qualifier. But their table has, and they are in the same schema as their table
        foreach my $table (sort keys %{$refschema->{TABLES}})
        {
            foreach
                my $index (
                   sort keys %{$refschema->{TABLES}->{$table}->{INDEXES}})
            {
                my $index_created = 0;
                my $idxref =
                    $refschema->{TABLES}->{$table}->{INDEXES}->{$index};
                my $idxdef .= "";
                if ($idxref->{DISABLE})
                {
                    $idxdef .= "-- ";
                }
                $idxdef .= "CREATE";
                if ($idxref->{UNIQUE})
                {
                    $idxdef .= " UNIQUE";
                }
                if (defined $idxref->{COLS})
                {
                   $idxdef .= " INDEX " . format_identifier($index) . " ON " . format_identifier($schema) . '.' . format_identifier($table) . " ("
                     . join(",", map{format_identifier_cols_index($_)} @{$idxref->{COLS}}) . ")";

                   if (defined $idxref->{INCLUDE}) {
                      $idxdef .= " INCLUDE (" .
                         join(",", map{format_identifier_cols_index($_)} @{$idxref->{INCLUDE}})
                         . ")";
                   }

                   if (not defined $idxref->{WHERE} and not defined $idxref->{DISABLE}) {
                      $idxdef .= ";\n";
                      print AFTER $idxdef;
                      # the possible comment would go to after file
                      $index_created = 1;
                   }

数据类型转换原理

数据类型转换

函数类型转换

存储过程

视图部分需手动改造

迁移方法

表结构转换

./sqlserver2pgsql.pl -b before.sql -a after.sql -u unsure.sql -k /opt/data_migration/data-integration/  -sd test -sh 127.0.0.1 -sp 1433 -su user_only -sw 122132321 -pd  test -ph 192.168.1.1 -pp 15432 -pu postgres -pw 12345678  -pi 8  -po 8 -f script.sql

表结构导入pg

/usr/local/pgsql1201/bin/psql -h 127.0.0.1 -U postgres -p 15432 <before.sql

数据迁移

cd /opt/data_migration/data-integration/
sh kitchen.sh -file=migration.kjb  -level=detailed >migration.log

数据比对

#!/usr/bin/env python
# -*- coding: utf-8 -*-


"""
@author:jacker_zhou
@create_time: 2017-04-07
@overview: mssql pg 
"""

__author__ = 'jacker_zhou'
__version__ = '0.1'

import psycopg2,pymssql 
import types
import time
TableSpace='public.' 
class CompareDataBase(): 
    def __init__(self): 
        
        self.pgcnotallow=psycopg2.connect(database="test",host="127.0.0.1",port=15432,user="postgres",password="test") 
        
        self.mscnotallow=pymssql.connect(host="192.168.1.1",user="test",password="test",database="test") 
    def commit(self): 
        self.pgconn.commit() 
    def close(self): 
        self.pgconn.close() 
        self.msconn.close() 
    def rollback(self): 
        self.pgconn.rollback() 
    def exesyncdb(self): 
        mscursor=self.msconn.cursor() 
        sql=("SELECT COUNT(COLUMNNAME) AS CT,TABLENAME FROM (SELECT A.NAME AS COLUMNNAME,B.NAME AS TABLENAME FROM SYSCOLUMNS A RIGHT JOIN SYSOBJECTS B ON A.ID=B.ID WHERE B.TYPE='U' AND B.NAME NOT IN ('dtproperties','0626')) A GROUP BY TABLENAME ") 
        mscursor.execute(sql) 
        table=mscursor.fetchall()
        print ("total table %d"%len(table))
        if(table is None or len(table)<=0): 
            return 
        else: 
            for row in table: 
                self.executeTable(row[1],row[0]) 
                print ("%s is execute success"%row[1])
    def comparedb(self): 
        mscursor=self.msconn.cursor() 
        sql=("SELECT COUNT(COLUMNNAME) AS CT,TABLENAME FROM (SELECT A.NAME AS COLUMNNAME,B.NAME AS TABLENAME FROM SYSCOLUMNS A RIGHT JOIN SYSOBJECTS B ON A.ID=B.ID WHERE B.TYPE='U' AND B.NAME NOT IN ('dtproperties','0626')) A GROUP BY TABLENAME ") 
        mscursor.execute(sql) 
        table=mscursor.fetchall()
        print ("total table %d"%len(table))
        if(table is None or len(table)<=0): 
            return 
        else: 
            for row in table: 
                self.compareTable(row[1]) 
    def executeTable(self,tablename,count): 
        #print tablename 
        sql1="SELECT * FROM %s"%tablename 
        print (sql1)
        mscursor=self.msconn.cursor() 
        mscursor.execute(sql1) 
        table=mscursor.fetchall()
        if(table is None or len(table)<=0): 
            mscursor.close() 
            return 
        lst_result=self.initColumn(table)
        #print "column" 
        mscursor.close() 
        print ("execute sync  %s data to postgresql"%tablename)
        sql2=self.initPgSql(tablename,count)
        pgcursor=self.pgconn.cursor() 
        pgcursor.executemany(sql2,lst_result) 
        pgcursor.close()
    def compareTable(self,tablename): 
        #print tablename 
        sql1="SELECT count(*) FROM %s"%tablename 
        mscursor=self.msconn.cursor() 
        mscursor.execute(sql1) 
        ms_res=mscursor.fetchall() 
        mscursor.close() 
        pgcursor=self.pgconn.cursor()
        pgcursor.execute(sql1) 
        pg_res=pgcursor.fetchall() 
        pgcursor.close()
        res =""
        if ms_res[0][0] == pg_res[0][0]:
            res ="ok"
        else:
            res = "fail"

        print ("execute compare  table  %s data  postgresql: %s  mssql:%s result: %s"%(tablename,pg_res[0][0],ms_res[0][0],res))
    if __name__=="__main__": 
        sdb= CompareDataBase()
        start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        print ("task start time %s"%start_time)
        try: 
            sdb.comparedb()
        except Exception as e: 
            print (e) 
            sdb.rollback() 
        else: 
            sdb.commit() 
        sdb.close() 
        end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        print ("task end time %s"%end_time)
        print ("ok........" )

参考

​ ​https://github.com/dalibo/sqlserver2pgsql​

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6