NodeJS,mysql2/promise,连接池执行,将数据复制到另一个数据库

背景

产品开发了新版本,数据不兼容,新版本的数据结构比旧版本复杂一些,目前,旧版本是在线系统,需要将旧版本程序的数据适配至新版本程序。数据库是MySQL,这里选择使用NodeJS来完成。

使用的版本:

"mysql": "^2.18.1", "mysql2": "^2.3.3"

工具代码

需要的依赖

npm install mysql --save npm install mysql2 --save

案例一,单表的数据同步:

database\mysql\mysql.tools.js

const mysql = require('mysql'); // 创建连接 const createConnection = (host, port, database, user, password) => { let connection = mysql.createConnection({ host, user, password, port, database, }); connection.connect(function (error) { if (error) { // console.log('[query] - :' + error); return; } // console.log('[src][connection connect] succeed!'); }); return connection; }; // 关闭连接 const closeConnection = (connection) => { connection.end(function (error) { if (error) { return; } // console.log('[src][connection end] succeed!'); }); }; module.exports = { createConnection, closeConnection }

database\mysql2\mysql2.tools.js

const mysql = require('mysql2'); // 创建连接 const createConnection = (host, port, database, user, password) => { let connection = mysql.createConnection({ host: host, user: user, password: password, database: database, port: port, multipleStatements: true }); return connection; }; // 关闭连接 const closeConnection = (connection) => { connection.end(function (error) { if (error) { return; } // console.log('[src][connection end] succeed!'); }); }; module.exports = { createConnection, closeConnection }

syncdata\00.base\syncData.js

let source = { host: "127.0.0.1", port: 3306, database: "db1", user: "root", password: "123456" }; let target = { host: "127.0.0.1", port: 3306, database: "db2", user: "root", password: "123456" }; // 主体ID和主体编号 const subject_id = '1001'; const subject_code = 'ABCDEFG'; module.exports = { source, target, subject_id, subject_code }

syncdata\01.manufactor\main.manufactor.js

const { createConnection, closeConnection } = require('../../database/mysql/mysql.tools'); const { source, target, subject_id, subject_code } = require('../00.base/syncData'); // 创建连接 let sourceConnection = createConnection(source.host, source.port, source.database, source.user, source.password); let targetConnection = createConnection(target.host, target.port, target.database, target.user, target.password); console.log("sourceConnection", sourceConnection); console.log("targetConnection", targetConnection); // 拷贝数据 function executeCopyData(sourceConnection, targetConnection, closeConnection) { // ====================================================================// // 1、查询数据 // ====================================================================// let selectSql = "select * from health_manufactor"; sourceConnection.query(selectSql, function (error, result, fields) { if (error) { throw error; } if (result.length > 0) { let count = 1; result.forEach(item => { console.log("item", item); // ====================================================================// // 2、插入数据 // ====================================================================// let addSql = `INSERT INTO goods_manufactor( id,subject_id,subject_code,manufactor_code,manufactor_name,address,phone, sorted,remark,status,logical_deleted,create_uid, create_user,create_time,modified_user,modified_time ) VALUES( ?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?, ? )`; let addSqlParams = [ item.id, subject_id, subject_code, item.manufactor_code, item.manufactor_name, item.address, '', 1, '', 1, 0, 0, 'system', new Date(), 'system', new Date() ]; targetConnection.query(addSql, addSqlParams, function (addErr, addResult) { if (addErr) { console.log(addErr); throw addErr; } console.log('addResult.insertId:', addResult.insertId); count++; if (count == result.length) { closeConnection(); } }); }); } }); } 

syncdata\02.brand\main.brand.js

const { createConnection, closeConnection } = require("../../database/mysql2/mysql2.tools"); const { source, target, subject_id, subject_code } = require('../00.base/syncData'); // 创建连接 let sourceConnection = createConnection(source.host, source.port, source.database, source.user, source.password); let targetConnection = createConnection(target.host, target.port, target.database, target.user, target.password); console.log("sourceConnection", sourceConnection); console.log("targetConnection", targetConnection); // 拷贝数据 function executeCopyData(sourceConnection, targetConnection, closeConnection) { // ====================================================================// // 1、查询数据 // ====================================================================// let selectSql = "SELECT * FROM health_brand"; sourceConnection.query(selectSql, function (error, result, fields) { if (error) { throw error; } if (result.length > 0) { let count = 1; for (let i = 0; i < result.length; i++) { // ====================================================================// // 2、查询一条数据 // ====================================================================// let selectOneSql = "select * from goods_manufactor where id = ?"; let selectOneSqlParams = [result[i].brand_manufactor_id]; targetConnection.query(selectOneSql, selectOneSqlParams, function (error1, result1, fields1) { if (error1) { throw error1; } console.log('result1', result1); // ====================================================================// // 3、插入数据 // ====================================================================// let insertSql = `insert into goods_brand( id,subject_id,subject_code,manufactor_id,manufactor_code, brand_code,brand_name,sorted,remark,status, logical_deleted,create_uid,create_user,create_time,modified_user, modified_time ) values( ?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?, ? )`; let insertSqlParams = [ result[i].id, subject_id, subject_code, result[i].brand_manufactor_id, result1[0].manufactor_code, result[i].brand_code, result[i].brand_name, 1, '', result[i].status, 0, 0, 'system', new Date(), 'system', new Date() ]; targetConnection.query(insertSql, insertSqlParams, function (error2, result2) { if (error2) { console.log(error2); throw error2; } console.log('result2.insertId:', result2.insertId); count++; if (count == result.length) { closeConnection(); } }); // ====================================================================// // The end // ====================================================================// }); } } }); } // 执行拷贝数据 executeCopyData(sourceConnection, targetConnection, function () { console.log('close sourceConnection ', sourceConnection); console.log('close targetConnection', targetConnection); closeConnection(sourceConnection); closeConnection(targetConnection); });

syncdata\03.category\main.category.js

const { createConnectionPool, createConnection, closeConnectionPool } = require("../../database/mysql2/mysql2.promise.tools"); const { source, target, subject_id, subject_code } = require('../00.base/syncData'); let sourceConnectionPool = createConnectionPool(source.host, source.port, source.database, source.user, source.password); let targetConnectionPool = createConnectionPool(target.host, target.port, target.database, target.user, target.password); console.log("sourceConnectionPool", sourceConnectionPool); console.log("targetConnectionPool", targetConnectionPool); // 拷贝数据 async function executeCopyData(sourceConnectionPool, targetConnectionPool, closeConnectionPoolHandle) { // ====================================================================// // 1、查询数据 // ====================================================================// let selectSql = "SELECT * FROM health_category"; const [res] = await sourceConnectionPool.execute(selectSql); console.log("res", res); let count = 1; res.forEach(async (item) => { // ====================================================================// // 2、插入数据 // ====================================================================// let insertSql = `insert into goods_category( id,subject_id,subject_code,category_code,category_name, parent_id,parent_code,tree_level,parent_full_path,sorted, remark,status,logical_deleted,create_uid,create_user, create_time,modified_user,modified_time ) values( ?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?, ?,?,? )`; let insertSqlParams = [ item.id, subject_id, subject_code, item.category_code, item.category_name, -1, '', 1, '', 1, '', 1, 0, 0, "system", new Date(), 'system', new Date() ]; const [results] = await targetConnectionPool.execute(insertSql, insertSqlParams); console.log("results", results); // 关闭 count++; if (count == res.length) { closeConnectionPoolHandle(); } }); } // 拷贝数据 executeCopyData(sourceConnectionPool, targetConnectionPool, () => { console.log('close sourceConnectionPool ', sourceConnectionPool); console.log('close targetConnectionPool', targetConnectionPool); closeConnectionPool(sourceConnectionPool); closeConnectionPool(targetConnectionPool); }); 

案例二,关联表的数据同步:

database\mysql2\mysql2.promise.tools.js

const mysql = require('mysql2/promise'); // 创建连接池 const createConnectionPool = (host, port, database, user, password) => { const pool = mysql.createPool({ host: host, user: user, password: password, database: database, port: port, //连接超额时是否等待 waitForConnections: true, //连接的最多的个数 connectionLimit: 5, //可以等待的连接的个数 queueLimit: 0 }); return pool; } // 创建连接 const createConnection = (host, port, database, user, password) => { let connection = mysql.createConnection({ host: host, user: user, password: password, database: database, port: port, multipleStatements: true }); return connection; }; // 关闭连接池 const closeConnectionPool = (connectionPool) => { connectionPool.end(function (error) { if (error) { return; } // console.log('[src][connection end] succeed!'); }); }; module.exports = { createConnectionPool, createConnection, closeConnectionPool };

syncdata\04.general_name\main.general_name.js

const { createConnectionPool, createConnection, closeConnectionPool } = require("../../database/mysql2/mysql2.promise.tools"); const { source, target, subject_id, subject_code } = require('../00.base/syncData'); let sourceConnectionPool = createConnectionPool(source.host, source.port, source.database, source.user, source.password); let targetConnectionPool = createConnectionPool(target.host, target.port, target.database, target.user, target.password); console.log("sourceConnectionPool", sourceConnectionPool); console.log("targetConnectionPool", targetConnectionPool); // 拷贝数据 async function executeCopyData(sourceConnectionPool, targetConnectionPool, closeConnectionPoolHandle) { // ====================================================================// // 1、查询数据 // ====================================================================// let selectSql = "SELECT * FROM health_icd_general_name"; sourceConnectionPool.execute(selectSql).then(data => { const [res] = data; // console.log("res", res); let itemMap = new Map(); let categroySet = new Set(); res.forEach(item => { itemMap.set(item.id, item); categroySet.add(item.category_id); }); let resolveData = { itemList: res, itemMap: itemMap, categroyList: Array.from(categroySet), }; return new Promise(function (resolve, reject) { resolve(resolveData); }); }).then(async (data) => { const { itemList, itemMap, categroyList } = data; // console.log("itemMap", itemMap); // console.log("categroyList", categroyList); // ====================================================================// // 2、查询数据 // ====================================================================// let selectSql = "select * from goods_category"; // let selectSql2 = "select * from goods_category where id in (?)"; let sekectSqlParam = [categroyList]; const [res] = await targetConnectionPool.execute(selectSql); // const [res] = await targetConnectionPool.execute(selectSql, sekectSqlParam); console.log("res", res); let categroyMap = new Map(); res.forEach(item => { categroyMap.set(item.id, item); }); let resolveData = { itemList: itemList, itemMap: itemMap, categroyList: categroyList, categroyMap: categroyMap, }; return new Promise(function (resolve, reject) { resolve(resolveData); }); }).then(data => { const { itemList, itemMap, categroyList, categroyMap } = data; //console.log("itemList", itemList); //console.log("itemMap", itemMap); //console.log("categroyList", categroyList); //console.log("categroyMap", categroyMap); let count = 1; itemList.forEach(async (item) => { // ====================================================================// // 3、插入数据 // ====================================================================// let insertSql = `insert into goods_generic_name( id,subject_id,subject_code,name,code, category_id,category_code,parent_id,parent_code,remark, status,sorted,logical_deleted,create_uid,create_user, create_time,modified_user,modified_time ) values( ?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?, ?,?,? )`; // 分类编号 let category_code = ''; if (categroyMap.get(item.category_id)) { category_code = categroyMap.get(item.category_id).category_code; } // 父通用名编号 let parent_code = ''; if (itemMap.get(item.parent_id)) { parent_code = itemMap.get(item.parent_id).general_code; } let parent_id = item.parent_id; if (item.id == 0) { parent_id = -1; } let insertSqlParams = [ item.id, subject_id, subject_code, item.general_name, item.general_code, item.category_id, category_code, item.parent_id, parent_code, '', 0, 1, 0, 0, 'system', new Date(), 'system', new Date() ]; console.log("insertSql", insertSql); console.log("insertSqlParams", insertSqlParams); // const [results] = await targetConnectionPool.execute(insertSql, insertSqlParams); //console.log("results", results); // count++; if (count == itemList.length) { closeConnectionPoolHandle(); } }); }); } // 拷贝数据 executeCopyData(sourceConnectionPool, targetConnectionPool, () => { console.log('close sourceConnectionPool ', sourceConnectionPool); console.log('close targetConnectionPool', targetConnectionPool); closeConnectionPool(sourceConnectionPool); closeConnectionPool(targetConnectionPool); });

syncdata\05.goods\main.goods.js

const { createConnectionPool, createConnection, closeConnectionPool } = require("../../database/mysql2/mysql2.promise.tools"); const { source, target, subject_id, subject_code } = require('../00.base/syncData'); let sourceConnectionPool = createConnectionPool(source.host, source.port, source.database, source.user, source.password); let targetConnectionPool = createConnectionPool(target.host, target.port, target.database, target.user, target.password); console.log("sourceConnectionPool", sourceConnectionPool); console.log("targetConnectionPool", targetConnectionPool); // 拷贝数据 async function executeCopyData(sourceConnectionPool, targetConnectionPool, closeConnectionPoolHandle) { // ====================================================================// // 1、查询数据 // ====================================================================// let selectSql = "SELECT * FROM health_goods"; sourceConnectionPool.execute(selectSql).then(data => { const [res] = data; // console.log("res", res); const resolveData = { goodsList: res }; return new Promise(function (resolve, reject) { resolve(resolveData); }); }).then(async (data) => { let selectSql = "SELECT * FROM goods_manufactor"; let [res] = await targetConnectionPool.execute(selectSql); // console.log("res", JSON.stringify(res)); let resolveData = Object.assign(data, { manufactorList: res }); // console.log("resolveData", JSON.stringify(resolveData)); return new Promise(function (resolve, reject) { resolve(resolveData); }); }).then(async (data) => { let selectSql = "SELECT * FROM goods_brand"; let [res] = await targetConnectionPool.execute(selectSql); //console.log("res", JSON.stringify(res)); let resolveData = Object.assign(data, { brandList: res }); // console.log("resolveData", resolveData); return new Promise(function (resolve, reject) { resolve(resolveData); }); }).then(async (data) => { let selectSql = "SELECT * FROM goods_category"; let [res] = await targetConnectionPool.execute(selectSql); //console.log("res", JSON.stringify(res)); let resolveData = Object.assign(data, { categoryList: res }); // console.log("resolveData", resolveData); return new Promise(function (resolve, reject) { resolve(resolveData); }); }).then(async (data) => { let selectSql = "SELECT * FROM goods_generic_name"; let [res] = await targetConnectionPool.execute(selectSql); //console.log("res", JSON.stringify(res)); let resolveData = Object.assign(data, { genericNameList: res }); // console.log("resolveData", resolveData); return new Promise(function (resolve, reject) { resolve(resolveData); }); }).then(async (data) => { const { goodsList, manufactorList, brandList, categoryList, genericNameList } = data; // console.log("goodsList", goodsList.length); console.log("manufactorList", manufactorList.length); console.log("brandList", brandList.length); console.log("categoryList", categoryList.length); console.log("genericNameList", genericNameList); // console.log("goodsList", goodsList); let manufactorMap = new Map(); manufactorList.forEach((item) => { manufactorMap.set(item.id, item); }); let brandMap = new Map(); brandList.forEach((item) => { brandMap.set(item.id, item); }); let categoryMap = new Map(); categoryList.forEach((item) => { categoryMap.set(item.id, item); }); let genericNameMap = new Map(); genericNameList.forEach((item) => { genericNameMap.set(item.id, item); }); // ====================================================================// // 插入数据 // ====================================================================// let count = 1; goodsList.forEach(async (item) => { let insertSql = `insert into goods_object( id,subject_id,subject_code,manufacturer_id,manufacturer_code, brand_id,brand_code,category_id,category_code,general_name_first_id, general_name_first_code,general_name_id,general_name_code,name,code, product_name,short_code,bar_code,product_type,instructions, main_graph_url,introduce_url,details,introduce,simple_spelling, dosage_form,apparatus_model,preservation_method,quality_guarantee_period,status, dosage,logical_deleted,create_uid,create_user,create_time, modified_user,modified_time ) values( ?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?, ?,? )`; // 厂家信息 let manufacturer_id = item.manufactor_id; let manufacturer_code = manufactorMap.get(manufacturer_id).manufactor_code; // 品牌信息 let brand_id = item.brand_id; let brand_code = brandMap.get(brand_id).brand_code; // 分类信息 let category_id = item.category_id; let category_code = categoryMap.get(category_id).category_code; // 通用名信息 let general_name_id = item.goods_general_name_id; let general_name_code = genericNameMap.get(general_name_id).code; // 插入参数 let insertSqlParams = [ item.id, subject_id, subject_code, manufacturer_id, manufacturer_code, brand_id, brand_code, category_id, category_code, 0, '', general_name_id, general_name_code, item.goods_name, item.goods_code, item.product_name, item.short_code, item.goods_bar_code, item.goods_type, item.goods_instructions, item.goods_img, '', '', item.goods_instructions, item.simple_spelling, item.goods_dosageform, '', item.preservation_method, item.quality_guarantee_period, 1, '', 0, 0, 'system', new Date(), 'system', new Date() ]; // const [results] = await targetConnectionPool.execute(insertSql, insertSqlParams); console.log("results", results); // count++; if (count == goodsList.length) { closeConnectionPoolHandle(); } }); }); } // 拷贝数据 executeCopyData(sourceConnectionPool, targetConnectionPool, () => { console.log('close sourceConnectionPool ', sourceConnectionPool); console.log('close targetConnectionPool', targetConnectionPool); closeConnectionPool(sourceConnectionPool); closeConnectionPool(targetConnectionPool); });

发表回复

您的邮箱地址不会被公开。必填项已用 * 标注