Commit e1df994e authored by 杨书顺's avatar 杨书顺

测试同步hive数据到es表

parent e6f6ccab
......@@ -6,7 +6,7 @@
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="SynRedis" />
<module name="syn2redis" />
</profile>
</annotationProcessing>
</component>
......
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="FrameworkDetectionExcludesConfiguration">
<file type="web" url="file://$PROJECT_DIR$" />
</component>
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
......
This diff is collapsed.
......@@ -51,6 +51,29 @@
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>2.4.6</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>5.5.4</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.7</version>
</dependency>
<dependency>
<groupId>my.elasticsearch</groupId>
<artifactId>es-shaded</artifactId>
<version>1.0.1</version>
</dependency>
</dependencies>
<build>
......
package com.duiba.synredis.test;
import org.apache.commons.lang3.StringUtils;
import java.util.Map;
/**
* 同步hive表tmp.es_tuia_advert_app_adslot_coupon_di 数据到es表es_tuia_advert_app_adslot_coupon_di
*/
public class synhive2es {
private String hivetable = "tmp.es_tuia_advert_app_adslot_coupon_di";
private String sourceFiled = "slot_id,start_time,finish_time,advert_id,app_id,launch_pv,exposure_pv,click_pv,consumer_total,pay_launch_pv,pay_exposure_pv,pay_click_pv";
private String sourceFiledType = "bigint,string,string,bigint,bigint,bigint,bigint,bigint,bigint,bigint,bigint,bigint";
private Map<String, String> getHiveData(String tableName, String filedList,String queryDate) {
if (StringUtils.isNotBlank(filedList)) {
String[] filedStrs = filedList.split(",");
StringBuffer colBuffer = new StringBuffer();
for (String col : filedStrs) {
colBuffer.append(col).append(",");
}
String selectSql = colBuffer.substring(0, colBuffer.length() - 1);
String sql = String.format("select %s from %s where dt='%s'",selectSql,tableName,queryDate);
}
return null;
}
private Map<String, Map<String, String>> getEsData() {
return null;
}
private void writeEs(String index, String type, Map<String, Map<String, Object>> map) {
}
}
package com.duiba.synredis.util;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.util.Map;
public class ElasticSearchUtil {
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUtil.class);
private volatile static ElasticSearchUtil instance = null;
private TransportClient client;
public static ElasticSearchUtil getInstance() {
if (instance == null) {
init();
}
return instance;
}
private synchronized static void init() {
if (instance == null) {
instance = new ElasticSearchUtil();
}
}
private ElasticSearchUtil() {
initClient();
}
private synchronized TransportClient initClient() {
if (client != null) {
return client;
}
try {
//加载配置文件
PropertyUtil instance = PropertyUtil.getInstance();
Settings settings = Settings.settingsBuilder().put("cluster.name", instance.getProperty("es.cluster.name"))
.put("client.transport.sniff", true).build();
String hostsports = instance.getProperty("es.hostsports");
if (StringUtils.isNotBlank(hostsports)) {
client = TransportClient.builder().settings(settings).build();
String[] hostsPortStrs = hostsports.split(",");
for (String hostport : hostsPortStrs) {
String[] strs = hostport.split(":");
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(strs[0]), Integer.valueOf(strs[1])));
}
}
return client;
} catch (Exception e) {
logger.error("initClient error", e);
}
return null;
}
public void batchInsertDoc(String index, String type, Map<String, Map<String, Object>> map) {
try{
if (StringUtils.isNotBlank(index) && StringUtils.isNotBlank(type) && null != map) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (String key : map.keySet()) {
bulkRequest.add(client.prepareIndex(index, type,key).setSource(map.get(key)));
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
logger.error(bulkResponse.buildFailureMessage());
}
}
}
catch (Exception e){
logger.error("batchInsertDoc error.", e);
}
}
private void closeClient() {
try {
if (client != null) {
client.close();
}
} catch (Exception e) {
logger.error("closeClient error.", e);
}
}
public static void main(String[] args) throws Exception {
}
}
......@@ -16,4 +16,12 @@ hadoop.redis.auth=UjTD4apxUgu4xNVTnRAtqQt
hive.drivername=org.apache.hive.jdbc.HiveDriver
hive.url=jdbc:hive2://172.16.1.21:10000/default
hive.username=hive
hive.password=hive
\ No newline at end of file
hive.password=hive
#es cluster config
#es.cluster.name=bigdata-es
#es.hostsports=172.16.1.121:9300,172.16.1.122:9300,172.16.1.123:9300
#test-es cluster config
es.cluster.name=test-es
es.hostsports=172.16.1.141:9300,172.16.1.142:9300,172.16.1.143:9300
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment