package com.se.simu.utils; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.se.simu.domain.vo.StationRainVo; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVRecord; import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; import java.nio.file.Files; import java.nio.file.Paths; import java.sql.*; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; /** * CSV 到 SQLite 实用程序 * * @author xingjinshuang@smartearth.cn * @date 2024/12/26 */ public class CsvToSQLiteUtils { private static final String DATABASE_URL = "jdbc:sqlite:D:\\0a_project\\simulation\\simuserver\\rainfall.db"; // SQLite数据库路径 private static final String CREATE_TABLE_SQL = "CREATE TABLE IF NOT EXISTS rainfall (" + " id INTEGER PRIMARY KEY AUTOINCREMENT," + " station_name TEXT NOT NULL," + " rainfall REAL NOT NULL," + " longitude REAL NOT NULL," + " latitude REAL NOT NULL," + " datetime TEXT NOT NULL" + ");"; private static final String INSERT_DATA_SQL = "INSERT INTO rainfall (station_name, rainfall, longitude, latitude, datetime) " + "VALUES (?, ?, ?, ?, ?);"; public static void main(String[] args) { // CSV文件路径 String csvFilePath = "D:\\0a_project\\model\\shp\\雨量站点数据\\雨量站包含坐标.csv"; // 1. 创建SQLite数据库连接 try (Connection conn = DriverManager.getConnection(DATABASE_URL)) { if (conn != null) { // 获取当前时间戳 Timestamp timestamp = new Timestamp(System.currentTimeMillis()); String tableName = "rainfall_" + timestamp.getTime(); System.out.println("tableName = " + tableName); // 2. 创建表 createTableIfNotExists(conn, tableName); // 3. 设置 SQLite 连接的字符编码为 UTF-8 try (Statement stmt = conn.createStatement()) { stmt.execute("PRAGMA encoding = 'UTF-8';"); // 设置SQLite编码为UTF-8 } // 4. 开始事务 conn.setAutoCommit(false); // 5. 读取CSV文件并插入数据 readCsvAndInsertData(csvFilePath, conn, tableName); // 6. 提交事务 conn.commit(); System.out.println("数据成功插入到SQLite数据库!"); } } catch (SQLException e) { System.err.println("SQLite连接失败: " + e.getMessage()); } } /** * 创建表,如果不存在的话 */ private static void createTableIfNotExists(Connection conn) throws SQLException { try (Statement stmt = conn.createStatement()) { stmt.execute(CREATE_TABLE_SQL); } } private static void createTableIfNotExists(Connection conn, String tableName) throws SQLException { try (Statement stmt = conn.createStatement()) { String CREATE_TABLE_SQL = "CREATE TABLE IF NOT EXISTS " + tableName + " (" + " id INTEGER PRIMARY KEY AUTOINCREMENT," + " station_name TEXT NOT NULL," + " rainfall REAL NOT NULL," + " longitude REAL NOT NULL," + " latitude REAL NOT NULL," + " datetime TEXT NOT NULL" + ");"; stmt.execute(CREATE_TABLE_SQL); } } /** * 读取CSV文件并将数据插入到SQLite数据库 * * @param csvFilePath CSV文件路径 * @param conn SQLite数据库连接 */ private static void readCsvAndInsertData(String csvFilePath, Connection conn, String tableName) { // 使用 Apache Commons CSV 读取CSV文件 try (Reader reader = new InputStreamReader(Files.newInputStream(Paths.get(csvFilePath)), "GBK")) { // try (Reader reader = new InputStreamReader(Files.newInputStream(Paths.get(csvFilePath)), StandardCharsets.UTF_8)) { Iterable records = CSVFormat.DEFAULT .withHeader("雨量站", "降雨量", "经度", "纬度", "datatime") .withSkipHeaderRecord() // 跳过表头 .parse(reader); String INSERT_DATA_SQL = "INSERT INTO " + tableName + " (station_name, rainfall, longitude, latitude, datetime) " + "VALUES (?, ?, ?, ?, ?);"; // 准备SQL插入语句 try (PreparedStatement pstmt = conn.prepareStatement(INSERT_DATA_SQL)) { int batchSize = 1000; // 每批插入的数量 int count = 0; for (CSVRecord record : records) { // 获取每一列的数据 String stationName = record.get("雨量站"); double rainfall = Double.parseDouble(record.get("降雨量")); double longitude = Double.parseDouble(record.get("经度")); double latitude = Double.parseDouble(record.get("纬度")); String datetime = record.get("datatime"); // 设置插入数据的参数 pstmt.setString(1, stationName); pstmt.setDouble(2, rainfall); pstmt.setDouble(3, longitude); pstmt.setDouble(4, latitude); pstmt.setString(5, datetime); // 批量添加到批处理中 pstmt.addBatch(); count++; // 每1000条数据执行一次批处理 if (count % batchSize == 0) { pstmt.executeBatch(); // 执行批量插入 } } // 执行剩余的批量插入 pstmt.executeBatch(); } } catch (IOException | SQLException e) { System.err.println("读取CSV或插入数据时出错: " + e.getMessage()); } } /** * 读取 CSV 保存本地 * * @param array 数据 * @param tableName 表名称 */ public static void readCsvSaveLocal(JSONArray array, String tableName) { // 1. 创建SQLite数据库连接 try (Connection conn = DriverManager.getConnection("jdbc:sqlite:rainfall.db")) { if (conn != null) { // 2. 创建表(如果不存在) createTableIfNotExists(conn, tableName); // 3. 设置SQLite连接的字符编码为UTF-8 try (Statement stmt = conn.createStatement()) { // 设置SQLite编码为UTF-8 stmt.execute("PRAGMA encoding = 'UTF-8';"); } // 4. 开始事务 conn.setAutoCommit(false); // 5. 读取CSV文件并插入数据 readCsvAndInsertDatas(array, conn, tableName); // 6. 提交事务 conn.commit(); System.out.println("数据成功插入到SQLite数据库!"); } } catch (SQLException e) { System.err.println("SQLite连接失败: " + e.getMessage()); } } /** * 读取 CSV 保存本地 * * @param stationRainFile Station Rain 文件 * @param tableName 表名称 */ public static JSONArray readCsvSaveLocal(String stationRainFile, String tableName) { // 1. 创建SQLite数据库连接 JSONArray array = null; try (Connection conn = DriverManager.getConnection("jdbc:sqlite:rainfall.db")) { if (conn != null) { // 2. 创建表(如果不存在) createTableIfNotExists(conn, tableName); // 3. 设置SQLite连接的字符编码为UTF-8 try (Statement stmt = conn.createStatement()) { // 设置SQLite编码为UTF-8 stmt.execute("PRAGMA encoding = 'UTF-8';"); } // 4. 开始事务 conn.setAutoCommit(false); // 5. 读取CSV文件并插入数据 array = readCsvAndInsertDatas(stationRainFile, conn, tableName); // 6. 提交事务 conn.commit(); System.out.println("数据成功插入到SQLite数据库!"); } } catch (SQLException e) { System.err.println("SQLite连接失败: " + e.getMessage()); } return array; } private static void readCsvAndInsertDatas(JSONArray array, Connection conn, String tableName) { String insertDataSql = "INSERT INTO " + tableName + " (station_name, rainfall, longitude, latitude, datetime) VALUES (?, ?, ?, ?, ?);"; try (PreparedStatement pstmt = conn.prepareStatement(insertDataSql)) { // 批量大小 int batchSize = 1000; int count = 0; for (int i = 0; i < array.size(); i++) { JSONObject object = array.getJSONObject(i); // 获取每一列的数据 String stationName = object.getString("stationName"); double rainfall = object.getDouble("rainfall"); double longitude = object.getDouble("longitude"); double latitude = object.getDouble("latitude"); String datetime = object.getString("datetime"); // 设置插入数据的参数 pstmt.setString(1, stationName); pstmt.setDouble(2, rainfall); pstmt.setDouble(3, longitude); pstmt.setDouble(4, latitude); pstmt.setString(5, datetime); // 添加到批处理 pstmt.addBatch(); count++; // 每batchSize条数据执行一次批量插入 if (count % batchSize == 0) { pstmt.executeBatch(); } } // 执行剩余的批量插入 pstmt.executeBatch(); } catch (SQLException e) { System.err.println("批量插入数据时出错: " + e.getMessage()); } } private static JSONArray readCsvAndInsertDatas(String csvFilePath, Connection conn, String tableName) { // 使用 Apache Commons CSV 读取CSV文件 JSONArray array = new JSONArray(); try (Reader reader = new InputStreamReader(Files.newInputStream(Paths.get(csvFilePath)), "GBK")) { Iterable records = CSVFormat.DEFAULT .withHeader("雨量站", "降雨量", "经度", "纬度", "datatime") .withSkipHeaderRecord() // 跳过表头 .parse(reader); String insertDataSql = "INSERT INTO " + tableName + " (station_name, rainfall, longitude, latitude, datetime) VALUES (?, ?, ?, ?, ?);"; try (PreparedStatement pstmt = conn.prepareStatement(insertDataSql)) { // 批量大小 int batchSize = 1000; int count = 0; for (CSVRecord record : records) { // 获取每一列的数据 String stationName = record.get("雨量站"); double rainfall = Double.parseDouble(record.get("降雨量")); double longitude = Double.parseDouble(record.get("经度")); double latitude = Double.parseDouble(record.get("纬度")); String datetime = record.get("datatime"); // 设置插入数据的参数 pstmt.setString(1, stationName); pstmt.setDouble(2, rainfall); pstmt.setDouble(3, longitude); pstmt.setDouble(4, latitude); pstmt.setString(5, datetime); JSONObject jsonObject = new JSONObject(); jsonObject.put("stationName", stationName); jsonObject.put("rainfall", rainfall); jsonObject.put("longitude", longitude); jsonObject.put("latitude", latitude); jsonObject.put("datetime", datetime); array.add(jsonObject); // 添加到批处理 pstmt.addBatch(); count++; // 每batchSize条数据执行一次批量插入 if (count % batchSize == 0) { pstmt.executeBatch(); } } // 执行剩余的批量插入 pstmt.executeBatch(); } catch (SQLException e) { System.err.println("批量插入数据时出错: " + e.getMessage()); } } catch (IOException e) { System.err.println("读取CSV或插入数据时出错: " + e.getMessage()); } return array; } /** * 获取总计 * * @param tableName 表名称 * @return {@link Double} */ public static Double getTotal(String tableName) { // 1. 创建SQLite数据库连接 try (Connection conn = DriverManager.getConnection("jdbc:sqlite:rainfall.db")) { if (conn != null) { // 2. 执行SQL查询 String queryDataSql = "select sum(rainfall_difference) as total from(SELECT ABS( (SELECT rainfall FROM " + tableName + " WHERE station_name = s.station_name ORDER BY datetime ASC LIMIT 1) - (SELECT rainfall FROM " + tableName + " WHERE station_name = s.station_name ORDER BY datetime DESC LIMIT 1)) AS rainfall_difference FROM " + tableName + " s GROUP BY station_name)"; // 3. 处理查询结果 try (PreparedStatement pstmt = conn.prepareStatement(queryDataSql)) { ResultSet rs = pstmt.executeQuery(); while (rs.next()) { // 获取总和 return rs.getDouble("total"); } } catch (SQLException e) { System.err.println("查询数据时出错: " + e.getMessage()); } } } catch (SQLException e) { System.err.println("SQLite连接失败: " + e.getMessage()); } return null; } /** * @param tableName 表名称 * @return {@link Integer} */ public static Integer getDuration(String tableName) throws SQLException { // // 1. 连接数据库 // try (Connection conn = DriverManager.getConnection("jdbc:sqlite:rainfall.db")) { // // 2. 执行查询语句 // String queryDataSql = "SELECT duration FROM " + tableName; // try (PreparedStatement pstmt = conn.prepareStatement(queryDataSql)) { // ResultSet rs = pstmt.executeQuery(); // while (rs.next()) { // // 获取总和 // return rs.getInt("duration"); // } // } catch (SQLException e) { // throw new RuntimeException(e); // } // } catch (SQLException e) { // System.err.println("SQLite连接失败: " + e.getMessage()); // } // 目前先根据文件的内容,手动给值,后续使用解析文件内容中的时间 // TODO: 2024/12/27 目前先根据文件的内容,手动给值,提高处理速度。后续使用解析文件内容中的时间。 return 1440; } /** * 分组查询 * * @param tableName 表名称 * @return {@link Double} */ public static List getNameList(String tableName) { // 1. 创建SQLite数据库连接 try (Connection conn = DriverManager.getConnection("jdbc:sqlite:rainfall.db")) { if (conn != null) { // 2. 执行SQL查询 String queryDataSql = "SELECT station_name FROM " + tableName + " GROUP BY station_name"; // 3. 处理查询结果 try (PreparedStatement pstmt = conn.prepareStatement(queryDataSql)) { ResultSet rs = pstmt.executeQuery(); List list = new ArrayList<>(); while (rs.next()) { // 获取总和 list.add(rs.getString("station_name")); } return list; } catch (SQLException e) { System.err.println("查询数据时出错: " + e.getMessage()); } } } catch (SQLException e) { System.err.println("SQLite连接失败: " + e.getMessage()); } return null; } /** * 分组查询 * * @param tableName 表名称 * @return {@link Double} */ public static List getList(String tableName, String name) { // 1. 创建SQLite数据库连接 try (Connection conn = DriverManager.getConnection("jdbc:sqlite:rainfall.db")) { if (conn != null) { // 2. 执行SQL查询 String queryDataSql = "SELECT * FROM " + tableName + " WHERE station_name='" + name + "' ORDER BY id asc"; // 3. 处理查询结果 try (PreparedStatement pstmt = conn.prepareStatement(queryDataSql)) { ResultSet rs = pstmt.executeQuery(); List list = new ArrayList<>(); StationRainVo vo; while (rs.next()) { vo = new StationRainVo(); vo.setId(rs.getLong("id")); vo.setStationName(rs.getString("station_name")); vo.setRainfall(rs.getDouble("rainfall")); vo.setLatitude(rs.getDouble("latitude")); vo.setLongitude(rs.getDouble("longitude")); vo.setDatetime(rs.getString("datetime")); list.add(vo); } return list; } catch (SQLException e) { System.err.println("查询数据时出错: " + e.getMessage()); } } } catch (SQLException e) { System.err.println("SQLite连接失败: " + e.getMessage()); } return null; } }