package com.wgcloud.util.jdbc;
|
|
import com.mongodb.ConnectionString;
|
import com.mongodb.client.MongoClient;
|
import com.mongodb.client.MongoClients;
|
import com.mongodb.client.MongoDatabase;
|
import com.wgcloud.entity.DbInfo;
|
import com.wgcloud.service.DbInfoService;
|
import com.wgcloud.service.LogInfoService;
|
import com.wgcloud.util.staticvar.StaticKeys;
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
import org.apache.kafka.clients.producer.Producer;
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
import org.apache.kafka.clients.producer.RecordMetadata;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.jdbc.core.JdbcTemplate;
|
import org.springframework.jdbc.datasource.DriverManagerDataSource;
|
import org.springframework.stereotype.Component;
|
|
import javax.annotation.Resource;
|
import java.util.Properties;
|
|
/**
|
* @version v2.3
|
* @ClassName:ConnectionUtil.java
|
* @author: http://www.wgstart.com
|
* @date: 2019年11月16日
|
* @Description: ConnectionUtil.java
|
* @Copyright: 2017-2022 wgcloud. All rights reserved.
|
*/
|
@Component
|
public class ConnectionUtil {
|
private static final Logger logger = LoggerFactory.getLogger(ConnectionUtil.class);
|
@Resource
|
private LogInfoService logInfoService;
|
@Resource
|
private DbInfoService dbInfoService;
|
|
public JdbcTemplate getJdbcTemplate(DbInfo dbInfo) throws Exception {
|
JdbcTemplate jdbcTemplate = null;
|
String driver = "";
|
String url = "";
|
if ("mysql".equals(dbInfo.getDbType())) {
|
driver = RDSConnection.driver_mysql;
|
url = RDSConnection.url_mysql;
|
} else if ("kafka".equals(dbInfo.getDbType())) {
|
driver = RDSConnection.driver_kafka;
|
url = RDSConnection.url_kafka;
|
} else if ("dameng".equals(dbInfo.getDbType())) {
|
driver = RDSConnection.driver_dameng;
|
url = RDSConnection.url_dameng;
|
} else if ("mongodb".equals(dbInfo.getDbType())) {
|
driver = RDSConnection.driver_mongodb;
|
url = RDSConnection.url_mongodb;
|
} else if ("postgresql".equals(dbInfo.getDbType())) {
|
driver = RDSConnection.driver_postgresql;
|
url = RDSConnection.url_postgresql;
|
} else if ("sqlserver".equals(dbInfo.getDbType())) {
|
driver = RDSConnection.driver_sqlserver;
|
url = RDSConnection.url_sqlserver;
|
} else if ("db2".equals(dbInfo.getDbType())) {
|
driver = RDSConnection.driver_db2;
|
url = RDSConnection.url_db2;
|
} else {
|
driver = RDSConnection.driver_oracle;
|
url = RDSConnection.url_oracle;
|
}
|
url = url.replace("{ip}", dbInfo.getIp()).replace("{port}", dbInfo.getPort()).replace("{dbname}", dbInfo.getDbName());
|
try {
|
if ("kafka".equals(dbInfo.getDbType()) && getKafkaCount(dbInfo) > 0) {
|
return new JdbcTemplate();
|
}
|
if ("mongodb".equals(dbInfo.getDbType()) && getMongdbCount(dbInfo) > 0) {
|
return new JdbcTemplate();
|
}
|
|
//创建连接池
|
DriverManagerDataSource dataSource = new DriverManagerDataSource();
|
dataSource.setDriverClassName(driver);
|
dataSource.setUrl(url);
|
dataSource.setUsername(dbInfo.getUser());
|
dataSource.setPassword(dbInfo.getPasswd());
|
jdbcTemplate = new JdbcTemplate(dataSource);
|
if ("mysql".equals(dbInfo.getDbType())) {
|
jdbcTemplate.queryForRowSet(RDSConnection.MYSQL_VERSION);
|
} else if ("dameng".equals(dbInfo.getDbType())) {
|
jdbcTemplate.queryForRowSet(RDSConnection.DAMENG_VERSION);
|
} else if ("postgresql".equals(dbInfo.getDbType())) {
|
jdbcTemplate.queryForRowSet(RDSConnection.MYSQL_VERSION);
|
} else if ("sqlserver".equals(dbInfo.getDbType())) {
|
jdbcTemplate.queryForRowSet(RDSConnection.SQLSERVER_VERSION);
|
} else if ("db2".equals(dbInfo.getDbType())) {
|
jdbcTemplate.queryForRowSet(RDSConnection.DB2_VERSION);
|
} else {
|
jdbcTemplate.queryForRowSet(RDSConnection.ORACLE_VERSION);
|
}
|
dbInfo.setDbState("1");
|
dbInfoService.updateById(dbInfo);
|
return jdbcTemplate;
|
} catch (Exception e) {
|
jdbcTemplate = null;
|
logger.error("连接数据库错误", e);
|
logInfoService.save("连接数据库错误:" + dbInfo.getAliasName(), "IP:" + dbInfo.getIp() + ",端口:" + dbInfo.getPort() + ",数据库别名:"
|
+ dbInfo.getAliasName() + ",错误信息:" + e.toString(), StaticKeys.LOG_ERROR);
|
dbInfo.setDbState("2");
|
dbInfoService.updateById(dbInfo);
|
}
|
return null;
|
}
|
|
public long queryTableCount(DbInfo dbInfo, String sql) {
|
try {
|
if (dbInfo.getDbType().equals("kafka")) {
|
return getKafkaCount(dbInfo);
|
}
|
if (dbInfo.getDbType().equals("mongodb")) {
|
return getMongdbCount(dbInfo);
|
}
|
|
JdbcTemplate jdbcTemplate = getJdbcTemplate(dbInfo);
|
if (null == jdbcTemplate) {
|
return 0;
|
}
|
return jdbcTemplate.queryForObject(sql, Long.class);
|
} catch (Exception e) {
|
logger.error("统计数据表错误:", e);
|
logInfoService.save("统计数据表错误:" + dbInfo.getAliasName(), "IP:" + dbInfo.getIp() + ",端口:" + dbInfo.getPort() + ",数据库别名:"
|
+ dbInfo.getAliasName() + ",错误信息:" + e.toString(), StaticKeys.LOG_ERROR);
|
return 0;
|
}
|
}
|
|
private long getKafkaCount(DbInfo dbInfo) throws Exception {
|
String url = RDSConnection.url_kafka.replace("{ip}", dbInfo.getIp()).replace("{port}", dbInfo.getPort());
|
|
Properties props = new Properties();
|
props.put("max.block.ms", 1500);
|
props.put("bootstrap.servers", url);
|
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
|
Producer<String, String> producer = new KafkaProducer<>(props);
|
RecordMetadata records = producer.send(new ProducerRecord<>("topic_test", "key", "value")).get();
|
producer.close();
|
|
//KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
|
|
return 1L;
|
}
|
|
private long getMongdbCount(DbInfo dbInfo) throws Exception {
|
String url = RDSConnection.url_mongodb.replace("{ip}", dbInfo.getIp()).replace("{port}", dbInfo.getPort());
|
|
ConnectionString con = new ConnectionString(url);
|
MongoClient client = MongoClients.create(con);
|
|
MongoDatabase db = client.getDatabase(dbInfo.getDbName());
|
String name = db.getName();
|
|
return 1L;
|
}
|
}
|