Commit ace361fe authored by shenjiaqing's avatar shenjiaqing

提交代码

parent c66a1cc2
......@@ -8,6 +8,7 @@ buildscript {
repositories {
maven { url "http://nexus.dui88.com:8081/nexus/content/groups/public/" }
maven { url 'http://maven.aliyun.com/nexus/content/groups/public/'}
maven { url "http://repo.spring.io/plugins-release" }
mavenCentral()
mavenLocal()
}
......@@ -37,7 +38,7 @@ allprojects {
}
group = "cn.com.duiba.boot"
version = "0.0.2"
version = "0.0.10"
}
subprojects {
......@@ -66,6 +67,10 @@ subprojects {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
dependency("org.springframework.cloud:spring-cloud-starter-netflix-eureka-client:2.2.3.RELEASE")
dependency("org.springframework.boot:spring-boot-starter-web:${springBootVersion}")
dependency("org.apache.dubbo:dubbo-spring-boot-starter:2.7.4.1")
//工程第三方依赖
......@@ -77,6 +82,7 @@ subprojects {
//升级算法版本
dependency("cn.com.duiba.nezha-alg:alg-model:2.23.43")
dependency('io.github.openfeign:feign-httpclient:10.10.1.dbfixed')
}
}
......@@ -101,7 +107,7 @@ subprojects {
pom.project {
name project.name
packaging "jar"
description 'tuia-algo-ENGINE'
description 'spring-boot-starter-dsp'
url "www.duiba.com.cn"
scm {
......
rootProject.name = 'spring-boot-starter-dsp'
include 'spring-boot-starter-dsp-model'
include 'spring-boot-starter-dsp-sampler'
include 'spring-boot-starter-dsp-warmup'
......@@ -4,7 +4,6 @@ import feign.RequestInterceptor;
import feign.RequestTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Objects;
@Slf4j
@Component
......@@ -12,15 +11,10 @@ public class CustomRequestInterceptor implements RequestInterceptor {
@Override
public void apply(RequestTemplate template) {
SamplerLogContext logSamplerTracer = SamplerLogThreadLocal.getContext().get();
if (Objects.isNull(logSamplerTracer) || Objects.isNull(logSamplerTracer.getPrintLogFlag())) {
if (!SamplerLog.infoFlag()) {
return;
}
if (logSamplerTracer.getPrintLogFlag()) {
template.header(SamplerLogConstant.RPC_SAMPLING_ID, logSamplerTracer.getSamplingId());
}
template.header(SamplerLogConstant.RPC_PRINT_LOG_FLAG, logSamplerTracer.getPrintLogFlag().toString());
template.header(SamplerLogConstant.RPC_SAMPLING_ID, SamplerLog.getSamplerId());
}
}
......@@ -20,23 +20,20 @@ public class DubboLogSamplerContextFilter implements Filter {
// 1、判断是消费者还是服务提供者
if (isConsumerSide) {
// 如果是消费者,将打印日志标识符set至上下文中
RpcContext.getContext().setAttachment(SamplerLogConstant.DUBBO_PRINT_LOG_FLAG, String.valueOf(SamplerLogThreadLocal.getContext().get().getPrintLogFlag()));
RpcContext.getContext().setAttachment(SamplerLogConstant.DUBBO_SAMPLING_ID, String.valueOf(SamplerLogThreadLocal.getContext().get().getSamplingId()));
RpcContext.getContext().setAttachment(SamplerLogConstant.DUBBO_SAMPLING_ID, SamplerLog.getSamplerId());
} else {
// 如果是服务提供者,从上下文取出
String traceContext = RpcContext.getContext().getAttachment(SamplerLogConstant.DUBBO_PRINT_LOG_FLAG);
String traceId = RpcContext.getContext().getAttachment(SamplerLogConstant.DUBBO_SAMPLING_ID);
SamplerLogThreadLocal.set(Objects.equals("true", traceContext), traceId);
String samplingId = RpcContext.getContext().getAttachment(SamplerLogConstant.DUBBO_SAMPLING_ID);
SamplerLog.setSamplerId(samplingId);
}
try {
return invoker.invoke(invocation);
} catch (Exception e) {
if (!(e instanceof RpcException)) {
log.warn("DubboLogSamplerContextFilter invoke warn", e);
} finally {
if (!isConsumerSide){
SamplerLog.end();
}
}
return null;
}
......
......@@ -9,7 +9,6 @@ import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.springframework.cloud.openfeign.FeignClient;
import java.util.Objects;
@Slf4j
@Configuration
......@@ -19,12 +18,12 @@ public class DuibaRpcContextParamsInterceptor extends HandlerInterceptorAdapter
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
String printLogFlag = request.getHeader(SamplerLogConstant.RPC_PRINT_LOG_FLAG);
String adxTraceId = request.getHeader(SamplerLogConstant.RPC_SAMPLING_ID);
if (StringUtils.isBlank(printLogFlag)) {
String samplingId = request.getHeader(SamplerLogConstant.RPC_SAMPLING_ID);
if (StringUtils.isBlank(samplingId)) {
return true;
}
SamplerLogThreadLocal.set(Objects.equals("true", printLogFlag), adxTraceId);
SamplerLog.setSamplerId(samplingId);
return true;
}
}
......@@ -4,32 +4,48 @@ import com.alibaba.ttl.TransmittableThreadLocal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
public class SamplerLog {
private static final Logger logger = LoggerFactory.getLogger(SamplerLog.class);
public static void startSampling(Integer samplingRate){
SamplerLogThreadLocal.setSamplingRate(samplingRate);
}
private static final TransmittableThreadLocal<String> LOCAL = new TransmittableThreadLocal<>();
public static boolean infoFlag() {
TransmittableThreadLocal<SamplerLogContext> threadLocal = SamplerLogThreadLocal.getContext();
private static final Random random = new Random();
if (Objects.isNull(threadLocal) || Objects.isNull(threadLocal.get()) || Objects.isNull(threadLocal.get().getPrintLogFlag())) {
return false;
public static void start(Integer samplingRate){
if (random.nextInt(samplingRate) != 0) {
return;
}
LOCAL.set(UUID.randomUUID().toString());
}
return threadLocal.get().getPrintLogFlag();
public static void end(){
LOCAL.remove();
}
public static void info(String format, Object... arguments) {
if (infoFlag()) {
logger.info("samplingId-" + SamplerLogThreadLocal.getContext().get().getSamplingId() + "," + format, arguments);
}
public static String getSamplerId(){
return LOCAL.get();
}
public static void setSamplerId(String samplerId) {
LOCAL.set(samplerId);
}
public static boolean infoFlag() {
return Objects.nonNull(LOCAL.get());
}
public static void info(String format, Object... arguments) {
try {
if (infoFlag()) {
logger.info("samplingId-" + LOCAL.get() + "," + format, arguments);
}
} catch (Exception e) {
logger.warn("SamplerLog info error", e);
}
}
}
......@@ -2,10 +2,6 @@ package cn.com.duiba.spring.boot.starter.dsp.sampler;
public class SamplerLogConstant {
public static String RPC_PRINT_LOG_FLAG = "rpcPrintLogFlag";
public static String DUBBO_PRINT_LOG_FLAG = "dubboPrintLogFlag";
public static String RPC_SAMPLING_ID = "rpcSamplingId";
public static String DUBBO_SAMPLING_ID = "dubboSamplingId";
......
package cn.com.duiba.spring.boot.starter.dsp.sampler;
import lombok.Data;
@Data
public class SamplerLogContext {
/**
* 日志采样标识符,是否需要打印日志
* true:打印日志;false:不打印日志
*/
private Boolean printLogFlag;
/**
* 日志采样id
*/
private String samplingId;
}
package cn.com.duiba.spring.boot.starter.dsp.sampler;
import com.alibaba.ttl.TransmittableThreadLocal;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
public class SamplerLogThreadLocal {
private static final Random random = new Random();
private static final TransmittableThreadLocal<SamplerLogContext> logSamplerTracerThreadLocal = new TransmittableThreadLocal<>();
static TransmittableThreadLocal<SamplerLogContext> getContext() {
return logSamplerTracerThreadLocal;
}
static void setSamplingRate(Integer samplingRate) {
if (Objects.isNull(samplingRate)) {
set(false, null);
return;
}
if (random.nextInt(samplingRate) == 0) {
set(true, UUID.randomUUID().toString());
return;
}
set(false, null);
}
static void set(boolean isPrint, String samplingId) {
SamplerLogContext logSamplerContext = new SamplerLogContext();
logSamplerContext.setPrintLogFlag(isPrint);
logSamplerContext.setSamplingId(samplingId);
logSamplerTracerThreadLocal.set(logSamplerContext);
}
}
dependencies {
provided "cn.com.duiba:wolf"
provided "cn.com.duiba.boot:spring-boot-ext-api"
provided "org.springframework:spring-web"
testCompile group: 'junit', name: 'junit', version: '4.12'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testCompileOnly 'org.projectlombok:lombok'
testAnnotationProcessor 'org.projectlombok:lombok'
compile "org.springframework.cloud:spring-cloud-starter-openfeign"
compile 'org.springframework.boot:spring-boot-autoconfigure'
compile "org.apache.commons:commons-lang3"
compile "org.springframework.boot:spring-boot-starter-cache"
compile 'io.github.openfeign:feign-httpclient'
compile "org.springframework.boot:spring-boot-starter-web"
compile "org.springframework.cloud:spring-cloud-starter-netflix-eureka-client"
}
\ No newline at end of file
package cn.com.duiba.spring.boot.starter.dsp.warmup;
import cn.com.duiba.boot.event.MainContextRefreshedEvent;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.MapType;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.github.benmanes.caffeine.cache.AsyncCache;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.discovery.EurekaClient;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfigureOrder;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import javax.annotation.Resource;
import java.lang.reflect.Field;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
/**
* @author MeiChang
* @Description
*/
@Configuration
@AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE)
public class CacheConfig {
private static final Logger logger = LoggerFactory.getLogger(CacheConfig.class);
@Autowired
private ConfigurableApplicationContext applicationContext;
@Resource(name = "eurekaClient")
private EurekaClient eurekaClient;
@Value("spring.application.name")
private String appName;
/**
* 本地缓存预热开关:默认关闭
*/
@Value("${local.cache.warmup.enable:false}")
private Boolean cacheEnable;
@EventListener(MainContextRefreshedEvent.class)
public void onMainContextRefreshed(){
String[] names = applicationContext.getBeanDefinitionNames();
for (String beanName : names) {
try {
Object bean = applicationContext.getBean(beanName);
Class<?> beanObjClass = bean.getClass();
Package pkg = beanObjClass.getPackage();
if (pkg == null) {
continue;
}
String packageName = pkg.getName();
if (packageName.startsWith("springfox.") || packageName.startsWith("org.")
|| packageName.startsWith("io.") || packageName.startsWith("net.")
|| packageName.startsWith("cn.com.duibaboot") || packageName.startsWith("com.netflix")) {
continue;
}
Field[] fields = bean.getClass().getDeclaredFields();
if (fields.length == 0) {
continue;
}
for (Field field : fields) {
boolean isCaffeine = AsyncCache.class.isAssignableFrom(field.getType());
if (!isCaffeine) {
continue;
}
field.setAccessible(true);
String keyName = String.format("%s-%s", beanName, field.getName());
try {
Object cache = field.get(bean);
if (!(cache instanceof AsyncCache)) {
continue;
}
ConcurrentMap<Object, Object> concurrentMap = ((AsyncCache) cache).synchronous().asMap();
CacheManager.saveCache(keyName, concurrentMap);
logger.info("beanName = {}, cache = {}, isCaffeine = {}, bean = {}", keyName, cache, isCaffeine, bean);
} catch (IllegalAccessException e) {
//ignore
} finally {
field.setAccessible(false);
}
}
}catch(Exception e){
logger.warn("", e);
}
}
}
// 顺序最靠后 等其他地方处理MainContextRefreshedEvent事件完成后
@EventListener(MainContextRefreshedEvent.class)
@Order(Ordered.LOWEST_PRECEDENCE-1)
public void onServerCache(){
try{
// 本地缓存预热开关 : 默认关闭
if (!cacheEnable) {
logger.warn("Cache preheating failed,because cacheEnable is false");
return;
}
List<InstanceInfo> instances = eurekaClient.getApplication(appName).getInstances();
logger.info("container of all registered AppInfo : {}",instances);
List<InstanceInfo> list = instances.stream().sorted(Comparator.comparing(InstanceInfo::getLastUpdatedTimestamp).reversed()).collect(Collectors.toList());
logger.info("container of all registered AppInfo : {}",list);
if (CollectionUtils.isEmpty(list)) {
logger.warn("Cache preheating failed,because 服务启动获取【" + appName + "】列表信息为空");
return;
}
String result = HttpRequestUtils.sendHttp(list.get(0).getHomePageUrl() + "/local/getCacheMap", null, appName);
if (StringUtils.isBlank(result)) {
logger.warn("Cache preheating failed,because request {} ,response {}", list.get(0).getHomePageUrl() + "/local/getCacheMap", result);
return;
}
ObjectMapper objectMapper = new ObjectMapper();
TypeFactory typeFactory = objectMapper.getTypeFactory();
Map<String, Object> map = objectMapper.convertValue(JSONObject.parse(result), typeFactory.constructMapType(Map.class, String.class, Object.class));
map.forEach((k, v) -> {
// beanName----cacheFieldName
String[] split = k.split("-");
Object bean = applicationContext.getBean(split[0]);
Field[] fields = bean.getClass().getDeclaredFields();
for (Field field : fields) {
if (field.getName().equals(split[1]) && ((JSONObject) JSONObject.parse(JSON.toJSONString(v))).size() > 0) {
try {
field.setAccessible(true);
Object obj = field.get(bean);
if (!(obj instanceof AsyncCache)) {
continue;
}
AsyncCache asyncCache = (AsyncCache) obj;
// 通过MapType解析map结构
ParameterizedType genericType = (ParameterizedType) field.getGenericType();
Type[] actualTypeArguments = genericType.getActualTypeArguments();
MapType mapType = typeFactory.constructMapType(Map.class, (Class<?>) actualTypeArguments[0], Object.class);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
Map convertMap = objectMapper.convertValue(v, mapType);
convertMap.forEach((k1, v1) -> {
// 通过JavaType解析map中的value结构
ParameterizedType genericType1 = (ParameterizedType) actualTypeArguments[1];
Type[] actualTypeArguments1 = genericType1.getActualTypeArguments();
JavaType javaType = typeFactory.constructType(actualTypeArguments1[0]);
asyncCache.put(k1, CompletableFuture.supplyAsync(() -> Optional.ofNullable(objectMapper.convertValue(v1, javaType))));
});
logger.info("Caffeine Cache key = {} ; Caffeine Cache Size = {}", k, asyncCache.synchronous().asMap().size());
field.setAccessible(false);
} catch (Exception e) {
logger.warn("this bean : {} , no have method : {}, Lead to it cannot be deserialized , cause: {}", split[0], field.getName() + "Serialize", e);
}
}
}
});
logger.info(JSON.toJSONString(map, SerializerFeature.PrettyFormat,SerializerFeature.WriteMapNullValue,SerializerFeature.WriteDateUseDateFormat));
} catch (Exception e) {
logger.warn("服务启动获取【" + appName +"】列表信息失败", e);
}
}
}
package cn.com.duiba.spring.boot.starter.dsp.warmup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.ConcurrentMap;
/**
* @author MeiChang
* @Description
*/
@Slf4j
@RestController
public class CacheEndpoint {
@RequestMapping("/local/getCacheMap")
public ConcurrentMap<String, Object> getCacheMap(){
return CacheManager.getCacheMap();
}
}
package cn.com.duiba.spring.boot.starter.dsp.warmup;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import java.util.concurrent.ConcurrentMap;
/** Cache对象管理器
* @author MeiChang
* @Description
*/
public class CacheManager {
/**
* 使用单例保存所有Cache对象
*/
private static final ConcurrentMap<String, Object> allCacheMap = Maps.newConcurrentMap();
static void saveCache(String key, Object value) {
if (StringUtils.isBlank(key)) {
return;
}
allCacheMap.put(key, value);
}
public static ConcurrentMap<String, Object> getCacheMap(){
return allCacheMap;
}
}
package cn.com.duiba.spring.boot.starter.dsp.warmup;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
@Slf4j
public class HttpRequestUtils {
/**
* 发送get请求
* @param url
* @param param
* @return
*/
public static String sendHttp(String url , Map<String, String> param, String appName){
CloseableHttpClient httpClient = HttpClientBuilder.create()
.setDefaultRequestConfig(RequestConfig.custom().setConnectTimeout(150).setSocketTimeout(150).setConnectionRequestTimeout(100).build())
.setMaxConnPerRoute(1)
.setMaxConnTotal(1000)
.evictExpiredConnections()//开启后台线程定时清理失效的连接,每隔10秒主动扫描并逐出超时的连接(超过keepAliveTimeout)
.setUserAgent(appName)
.setRetryHandler(new DefaultHttpRequestRetryHandler()) // 默认开启重试机制,重试3次
.disableCookieManagement()
.useSystemProperties()//for proxy
.disableRedirectHandling()
.setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy(){
@Override
public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
long time = super.getKeepAliveDuration(response, context);
if(time == -1){
time = 30000;//链接最多空闲30秒
}
return time;
}
})
.build();
String resultString = "";
CloseableHttpResponse response = null;
try {
// 创建uri
URIBuilder builder = new URIBuilder(url);
if (param != null) {
for (String key : param.keySet()) {
builder.addParameter(key, param.get(key));
}
}
URI uri = builder.build();
// 创建http GET请求
HttpGet httpGet = new HttpGet(uri);
// 执行请求
response = httpClient.execute(httpGet);
// 判断返回状态是否为200
if (response.getStatusLine().getStatusCode() == 200) {
resultString = EntityUtils.toString(response.getEntity(), "UTF-8");
}else {
log.warn("Cache preheating failed,because response {}", JSON.toJSONString(response, SerializerFeature.PrettyFormat));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (response != null) {
response.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return resultString;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment