Commit a4d5a4f5 authored by shenjiaqing's avatar shenjiaqing

提交代码

parent a65410c5
...@@ -38,7 +38,7 @@ allprojects { ...@@ -38,7 +38,7 @@ allprojects {
} }
group = "cn.com.duiba.boot" group = "cn.com.duiba.boot"
version = "0.0.44" version = "0.0.49"
} }
subprojects { subprojects {
...@@ -81,7 +81,7 @@ subprojects { ...@@ -81,7 +81,7 @@ subprojects {
dependency('org.projectlombok:lombok:1.18.12') dependency('org.projectlombok:lombok:1.18.12')
//升级算法版本 //升级算法版本
dependency("cn.com.duiba.nezha-alg:alg-model:2.28.7") dependency("cn.com.duiba.nezha-alg:alg-model:2.28.11-SNAPSHOT")
dependency('io.github.openfeign:feign-httpclient:10.10.1.dbfixed') dependency('io.github.openfeign:feign-httpclient:10.10.1.dbfixed')
//log4j fix //log4j fix
......
...@@ -18,9 +18,5 @@ public class Commons { ...@@ -18,9 +18,5 @@ public class Commons {
return redisScript; return redisScript;
} }
@Bean
public TokenBucketLimiter createTokenBucketLimiter() {
return new TokenBucketLimiter();
}
} }
...@@ -8,9 +8,14 @@ import org.aspectj.lang.reflect.MethodSignature; ...@@ -8,9 +8,14 @@ import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
...@@ -27,6 +32,7 @@ public class LimitAspect { ...@@ -27,6 +32,7 @@ public class LimitAspect {
private final static Integer REDIS_NODE_NUM = 4; private final static Integer REDIS_NODE_NUM = 4;
static { static {
map.put("rate.limit:cn.com.duiba.tuia.algo.engine.adx.service.v4.AdxAlgoService-execute-limit1", new AtomicLong()); map.put("rate.limit:cn.com.duiba.tuia.algo.engine.adx.service.v4.AdxAlgoService-execute-limit1", new AtomicLong());
map.put("rate.limit:cn.com.duiba.tuia.algo.engine.adx.service.v4.AdxAlgoService-aaa-limit1", new AtomicLong());
redisNodeIndex.put(0L, 14); redisNodeIndex.put(0L, 14);
redisNodeIndex.put(1L, 18); redisNodeIndex.put(1L, 18);
redisNodeIndex.put(2L, 296); redisNodeIndex.put(2L, 296);
...@@ -48,8 +54,11 @@ public class LimitAspect { ...@@ -48,8 +54,11 @@ public class LimitAspect {
@Autowired @Autowired
private RateLimitProperties rateLimitProperties; private RateLimitProperties rateLimitProperties;
@Autowired @Resource
private TokenBucketLimiter tokenBucketLimiter; private StringRedisTemplate stringRedisTemplate;
@Resource
private DefaultRedisScript<Long> redisLuaScript;
@Pointcut(value = "@annotation(cn.com.duiba.spring.boot.starter.dsp.rateLimiter.RateLimit)") @Pointcut(value = "@annotation(cn.com.duiba.spring.boot.starter.dsp.rateLimiter.RateLimit)")
public void rateLimitPointcut() { public void rateLimitPointcut() {
...@@ -86,8 +95,9 @@ public class LimitAspect { ...@@ -86,8 +95,9 @@ public class LimitAspect {
if (limitCount <= 0) { if (limitCount <= 0) {
throw new RuntimeException("已经到设置限流次数"); throw new RuntimeException("已经到设置限流次数");
} }
TokenBucketLimiterPolicy tokenBucketLimiterPolicy = new TokenBucketLimiterPolicy(limitCount, 200); Long number = stringRedisTemplate.execute(redisLuaScript, Collections.singletonList(redisKey), String.valueOf(limitCount), String.valueOf(rateLimit.time()));
if (tokenBucketLimiter.acquire(redisKey, tokenBucketLimiterPolicy)) { if (Objects.nonNull(number) && number > 0) {
logger.info("限流时间段内访问第:{} 次", number);
return joinPoint.proceed(); return joinPoint.proceed();
} }
...@@ -95,4 +105,6 @@ public class LimitAspect { ...@@ -95,4 +105,6 @@ public class LimitAspect {
throw new RuntimeException("已经到设置限流次数"); throw new RuntimeException("已经到设置限流次数");
} }
} }
package cn.com.duiba.spring.boot.starter.dsp.rateLimiter;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import javax.annotation.Resource;
import java.util.Objects;
@Slf4j
public class TokenBucketLimiter {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private DefaultRedisScript<Long> redisLuaScript;
boolean acquire(String key, TokenBucketLimiterPolicy policy) {
if (policy == null) {
return true;
}
Long remain = stringRedisTemplate.execute(redisLuaScript, Lists.asList(key, new String[]{}), policy.toParams());
return Objects.nonNull(remain) && remain > 0;
}
}
package cn.com.duiba.spring.boot.starter.dsp.rateLimiter;
import com.google.common.collect.Lists;
import lombok.Data;
import java.util.List;
/**
* 令牌桶限流器的执行对象
*/
@Data
public class TokenBucketLimiterPolicy {
/**
* 限流时间间隔
* (重置桶内令牌的时间间隔)
*/
private long resetBucketInterval;
/**
* 最大令牌数量
*/
private long bucketMaxTokens;
/**
* 初始可存储数量
*/
private long initTokens;
/**
* 每个令牌产生的时间
*/
private long intervalPerPermit;
public TokenBucketLimiterPolicy(long bucketMaxTokens, long maxBurstTime) {
// 最大令牌数
this.bucketMaxTokens = bucketMaxTokens;
// 限流时间间隔
this.resetBucketInterval = 1000;
if (bucketMaxTokens >= 200) {
intervalPerPermit = 50;
initTokens = bucketMaxTokens / 50;
return;
}
// 令牌的产生间隔 = 限流时间 / 最大令牌数
intervalPerPermit = resetBucketInterval / bucketMaxTokens;
// 初始令牌数 = 最大的突发流量的持续时间 / 令牌产生间隔
// 用 最大的突发流量的持续时间 计算的结果更加合理,并不是每次初始化都要将桶装满
initTokens = Math.min(Math.max(maxBurstTime * bucketMaxTokens / resetBucketInterval, 1), bucketMaxTokens);
}
public String[] toParams() {
List<String > list = Lists.newArrayList();
list.add(String.valueOf(getIntervalPerPermit()));
list.add(String.valueOf(System.currentTimeMillis()));
list.add(String.valueOf(getInitTokens()));
list.add(String.valueOf(getBucketMaxTokens()));
list.add(String.valueOf(getResetBucketInterval()));
return list.toArray(new String[]{});
}
}
local intervalPerTokens = tonumber(ARGV[1]) --Lua脚本
local curTime = tonumber(ARGV[2])
local initTokens = tonumber(ARGV[3]) --- 时间窗最大并发数
local bucketMaxTokens = tonumber(ARGV[4]) local limit = tonumber(ARGV[1])
local resetBucketInterval = tonumber(ARGV[5]) --- 时间窗内当前并发数
local current = tonumber(redis.call('get', KEYS[1]) or "0")
local bucket = redis.call('hgetall', KEYS[1]) --如果超出限流大小
local currentTokens if current + 1 > limit then
return 0
-- 若当前桶未初始化,先初始化令牌桶 else --请求数+1,并设置2秒过期
if table.maxn(bucket) == 0 then redis.call("INCRBY", KEYS[1],"1")
-- 初始桶内令牌 redis.call("expire", KEYS[1],"2")
currentTokens = initTokens return current + 1
-- 设置桶最近的填充时间是当前
redis.call('hset', KEYS[1], 'lastRefillTime', curTime)
-- 初始化令牌桶的过期时间, 设置为间隔的 1.5 倍
redis.call('pexpire', KEYS[1], resetBucketInterval * 1.5)
-- 若桶已初始化,开始计算桶内令牌
-- 为什么等于 4 ? 因为有两对 field, 加起来长度是 4
-- { "lastRefillTime(上一次更新时间)","curTime(更新时间值)","tokensRemaining(当前保留的令牌)","令牌数" }
elseif table.maxn(bucket) == 4 then
-- 上次填充时间
local lastRefillTime = tonumber(bucket[2])
-- 剩余的令牌数
local tokensRemaining = tonumber(bucket[4])
-- 当前时间大于上次填充时间
if curTime > lastRefillTime then
-- 拿到当前时间与上次填充时间的时间间隔
-- 举例理解: curTime = 2620 , lastRefillTime = 2000, intervalSinceLast = 620
local intervalSinceLast = curTime - lastRefillTime
-- 如果当前时间间隔 大于 令牌的生成间隔
-- 举例理解: intervalSinceLast = 620, resetBucketInterval = 1000
if intervalSinceLast > resetBucketInterval then
-- 将当前令牌填充满
currentTokens = initTokens
-- 更新重新填充时间
redis.call('hset', KEYS[1], 'lastRefillTime', curTime)
-- 如果当前时间间隔 小于 令牌的生成间隔
else
-- 可授予的令牌 = 向下取整数( 上次填充时间与当前时间的时间间隔 / 两个令牌许可之间的时间间隔 )
-- 举例理解 : intervalPerTokens = 200 ms , 令牌间隔时间为 200ms
-- intervalSinceLast = 620 ms , 当前距离上一个填充时间差为 620ms
-- grantedTokens = 620/200 = 3.1 = 3
local grantedTokens = math.floor(intervalSinceLast / intervalPerTokens)
-- 可授予的令牌 > 0 时
-- 举例理解 : grantedTokens = 620/200 = 3.1 = 3
if grantedTokens > 0 then
-- 生成的令牌 = 上次填充时间与当前时间的时间间隔 % 两个令牌许可之间的时间间隔
-- 举例理解 : padMillis = 620%200 = 20
-- curTime = 2620
-- curTime - padMillis = 2600
local padMillis = math.fmod(intervalSinceLast, intervalPerTokens)
-- 将当前令牌桶更新到上一次生成时间
redis.call('hset', KEYS[1], 'lastRefillTime', curTime - padMillis)
end
-- 更新当前令牌桶中的令牌数
-- Math.min(根据时间生成的令牌数 + 剩下的令牌数, 桶的限制) => 超出桶最大令牌的就丢弃
currentTokens = math.min(grantedTokens + tokensRemaining, bucketMaxTokens)
end
else
-- 如果当前时间小于或等于上次更新的时间, 说明刚刚初始化, 当前令牌数量等于桶内令牌数
-- 不需要重新填充
currentTokens = tokensRemaining
end
end
-- 如果当前桶内令牌小于 0,抛出异常
assert(currentTokens >= 0)
-- 如果当前令牌 == 0 ,更新桶内令牌, 返回 0
if currentTokens == 0 then
redis.call('hset', KEYS[1], 'tokensRemaining', currentTokens)
return 0
else
-- 如果当前令牌 大于 0, 更新当前桶内的令牌 -1 , 再返回当前桶内令牌数
redis.call('hset', KEYS[1], 'tokensRemaining', currentTokens - 1)
return currentTokens
end end
...@@ -2,16 +2,21 @@ package cn.com.duiba.spring.boot.starter.dsp.sampler.filter; ...@@ -2,16 +2,21 @@ package cn.com.duiba.spring.boot.starter.dsp.sampler.filter;
import cn.com.duiba.spring.boot.starter.dsp.sampler.SamplerLog; import cn.com.duiba.spring.boot.starter.dsp.sampler.SamplerLog;
import cn.com.duiba.spring.boot.starter.dsp.sampler.constants.SamplerLogConstant; import cn.com.duiba.spring.boot.starter.dsp.sampler.constants.SamplerLogConstant;
import org.apache.commons.lang3.StringUtils;
import org.apache.dubbo.common.URL; import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants; import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.Activate; import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.*; import org.apache.dubbo.rpc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects; import java.util.Objects;
@Activate(group = {CommonConstants.CONSUMER, CommonConstants.PROVIDER}, order = -2000) @Activate(group = {CommonConstants.CONSUMER, CommonConstants.PROVIDER}, order = -2000)
public class DubboLogSamplerContextFilter implements Filter { public class DubboLogSamplerContextFilter implements Filter {
private static final Logger logger = LoggerFactory.getLogger(DubboLogSamplerContextFilter.class);
@Override @Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl(); URL url = invoker.getUrl();
...@@ -19,8 +24,9 @@ public class DubboLogSamplerContextFilter implements Filter { ...@@ -19,8 +24,9 @@ public class DubboLogSamplerContextFilter implements Filter {
boolean isConsumerSide = Objects.equals(sideKey, CommonConstants.CONSUMER); boolean isConsumerSide = Objects.equals(sideKey, CommonConstants.CONSUMER);
// 1、判断是消费者还是服务提供者 // 1、判断是消费者还是服务提供者
if (isConsumerSide) { if (isConsumerSide && StringUtils.isNotBlank(SamplerLog.getSamplerId())) {
// 如果是消费者,将打印日志标识符set至上下文中 // 如果是消费者,将打印日志标识符set至上下文中
RpcContext.getContext().setAttachment(SamplerLogConstant.DUBBO_SAMPLING_ID, SamplerLog.getSamplerId()); RpcContext.getContext().setAttachment(SamplerLogConstant.DUBBO_SAMPLING_ID, SamplerLog.getSamplerId());
} else { } else {
// 如果是服务提供者,从上下文取出 // 如果是服务提供者,从上下文取出
...@@ -36,6 +42,8 @@ public class DubboLogSamplerContextFilter implements Filter { ...@@ -36,6 +42,8 @@ public class DubboLogSamplerContextFilter implements Filter {
} }
} }
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment