Commit dba941c7 authored by shenjiaqing's avatar shenjiaqing

提交代码

parents
HELP.md
.gradle
build/
!gradle/wrapper/gradle-wrapper.jar
!**/src/main/**/build/
!**/src/test/**/build/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
bin/
!**/src/main/**/bin/
!**/src/test/**/bin/
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
out/
!**/src/main/**/out/
!**/src/test/**/out/
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
### VS Code ###
.vscode/
buildscript {
ext["duibaExtVersion"] = "2.0.0-h45"
ext["springBootVersion"] = "2.2.7.RELEASE"
ext["springCloudVersion"] = "Hoxton.SR6"
ext["hazelcast.version"] = "3.11"
ext['curator.version'] = '2.13.0'
repositories {
maven { url "http://nexus.dui88.com:8081/nexus/content/groups/public/" }
maven { url 'http://maven.aliyun.com/nexus/content/groups/public/'}
mavenCentral()
mavenLocal()
}
dependencies {
classpath "io.spring.gradle:dependency-management-plugin:1.0.5.RELEASE"
classpath "org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}"
classpath "org.springframework.build.gradle:propdeps-plugin:0.0.7"
classpath "cn.com.duiba:duiba-gradle-plugin:0.1.9"
classpath "gradle.plugin.com.google.cloud.tools:jib-gradle-plugin:2.5.0"
classpath group: 'com.google.guava', name: 'guava', version: '29.0-jre'
}
}
allprojects {
apply plugin: "duiba.gradle.plugin"
apply plugin: "maven"
apply plugin: "java"
apply plugin: "idea"
apply plugin: "eclipse"
apply plugin: "jacoco"
apply plugin: "io.spring.dependency-management"
apply plugin: "propdeps"
test {
ignoreFailures = true
}
group = "cn.com.duiba.boot"
version = "0.0.3-SNAPSHOT"
}
subprojects {
sourceCompatibility = 1.8
targetCompatibility = 1.8
configurations{
all*.exclude group: "log4j", module:"log4j"
all*.exclude group: "org.slf4j", module: "slf4j-log4j12"
all*.exclude group: "javax.servlet", module: "servlet-api" //servlet 2.5
all*.exclude group: "com.alibaba", module: "dubbo"
all*.exclude group: "com.fasterxml.jackson.dataformat", module: "jackson-dataformat-xml"
}
dependencyManagement {
resolutionStrategy {
cacheChangingModulesFor 0, 'seconds'
}
dependencies {
imports {
mavenBom "cn.com.duiba.boot:spring-boot-ext-dependencies:${duibaExtVersion}"
mavenBom "org.springframework.boot:spring-boot-dependencies:${springBootVersion}"
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
dependency("org.apache.dubbo:dubbo-spring-boot-starter:2.7.4.1")
//工程第三方依赖
dependency('org.apache.commons:commons-lang3:3.6')
dependency('com.alibaba:transmittable-thread-local:2.11.4')
//工程第三方依赖
dependency('org.projectlombok:lombok:1.18.12')
//升级算法版本
dependency("cn.com.duiba.nezha-alg:alg-model:2.23.43")
}
}
repositories {
maven { url "http://nexus.dui88.com:8081/nexus/content/groups/public/" }
mavenCentral()
mavenLocal()
}
uploadArchives {
repositories {
mavenDeployer {
snapshotRepository(url: "http://nexus.dui88.com:8081/nexus/content/repositories/snapshots/") {
authentication(userName: "admin", password: "admin123")
}
repository(url: "http://nexus.dui88.com:8081/nexus/content/repositories/releases/") {
authentication(userName: "admin", password: "admin123")
}
pom.project {
name project.name
packaging "jar"
description 'tuia-algo-ENGINE'
url "www.duiba.com.cn"
scm {
url ""
connection ""
developerConnection ""
}
licenses {
license {
name "No License"
url "http://www.duiba.com.cn"
distribution "repo"
}
}
developers {
developer {
id "xuhengfei"
name "Hengfei Xu"
}
}
}
}
}
}
task sourcesJar(type: Jar, dependsOn: classes) {
classifier = "sources"
from sourceSets.main.allSource
}
artifacts { archives sourcesJar }
}
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.0.2-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
#!/usr/bin/env sh
#
# Copyright 2015 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
##############################################################################
##
## Gradle start up script for UN*X
##
##############################################################################
# Attempt to set APP_HOME
# Resolve links: $0 may be a link
PRG="$0"
# Need this for relative symlinks.
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG=`dirname "$PRG"`"/$link"
fi
done
SAVED="`pwd`"
cd "`dirname \"$PRG\"`/" >/dev/null
APP_HOME="`pwd -P`"
cd "$SAVED" >/dev/null
APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"`
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
warn () {
echo "$*"
}
die () {
echo
echo "$*"
echo
exit 1
}
# OS specific support (must be 'true' or 'false').
cygwin=false
msys=false
darwin=false
nonstop=false
case "`uname`" in
CYGWIN* )
cygwin=true
;;
Darwin* )
darwin=true
;;
MINGW* )
msys=true
;;
NONSTOP* )
nonstop=true
;;
esac
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
# Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD="$JAVA_HOME/jre/sh/java"
else
JAVACMD="$JAVA_HOME/bin/java"
fi
if [ ! -x "$JAVACMD" ] ; then
die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
else
JAVACMD="java"
which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
# Increase the maximum file descriptors if we can.
if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then
MAX_FD_LIMIT=`ulimit -H -n`
if [ $? -eq 0 ] ; then
if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
MAX_FD="$MAX_FD_LIMIT"
fi
ulimit -n $MAX_FD
if [ $? -ne 0 ] ; then
warn "Could not set maximum file descriptor limit: $MAX_FD"
fi
else
warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT"
fi
fi
# For Darwin, add options to specify how the application appears in the dock
if $darwin; then
GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
fi
# For Cygwin or MSYS, switch paths to Windows format before running java
if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
JAVACMD=`cygpath --unix "$JAVACMD"`
# We build the pattern for arguments to be converted via cygpath
ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
SEP=""
for dir in $ROOTDIRSRAW ; do
ROOTDIRS="$ROOTDIRS$SEP$dir"
SEP="|"
done
OURCYGPATTERN="(^($ROOTDIRS))"
# Add a user-defined pattern to the cygpath arguments
if [ "$GRADLE_CYGPATTERN" != "" ] ; then
OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)"
fi
# Now convert the arguments - kludge to limit ourselves to /bin/sh
i=0
for arg in "$@" ; do
CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -`
CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option
if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition
eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"`
else
eval `echo args$i`="\"$arg\""
fi
i=`expr $i + 1`
done
case $i in
0) set -- ;;
1) set -- "$args0" ;;
2) set -- "$args0" "$args1" ;;
3) set -- "$args0" "$args1" "$args2" ;;
4) set -- "$args0" "$args1" "$args2" "$args3" ;;
5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
esac
fi
# Escape application args
save () {
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
echo " "
}
APP_ARGS=`save "$@"`
# Collect all arguments for the java command, following the shell quoting and substitution rules
eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS"
exec "$JAVACMD" "$@"
@rem
@rem Copyright 2015 the original author or authors.
@rem
@rem Licensed under the Apache License, Version 2.0 (the "License");
@rem you may not use this file except in compliance with the License.
@rem You may obtain a copy of the License at
@rem
@rem https://www.apache.org/licenses/LICENSE-2.0
@rem
@rem Unless required by applicable law or agreed to in writing, software
@rem distributed under the License is distributed on an "AS IS" BASIS,
@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@rem See the License for the specific language governing permissions and
@rem limitations under the License.
@rem
@if "%DEBUG%" == "" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@rem
@rem ##########################################################################
@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal
set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
@rem Resolve any "." and ".." in APP_HOME to make it shorter.
for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto execute
echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:findJavaFromJavaHome
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe
if exist "%JAVA_EXE%" goto execute
echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:execute
@rem Setup the command line
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
:end
@rem End local scope for the variables with windows NT shell
if "%ERRORLEVEL%"=="0" goto mainEnd
:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
exit /b 1
:mainEnd
if "%OS%"=="Windows_NT" endlocal
:omega
rootProject.name = 'spring-boot-starter-dsp'
include 'spring-boot-starter-dsp-model'
include 'spring-boot-starter-dsp-sampler'
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testCompileOnly 'org.projectlombok:lombok'
testAnnotationProcessor 'org.projectlombok:lombok'
compile ("cn.com.duiba.nezha-alg:alg-model")
compile "org.springframework.boot:spring-boot-starter-web"
}
package cn.com.duiba.spring.boot.starter.dsp.model.config;
import cn.com.duiba.spring.boot.starter.dsp.model.enums.ChooseTFModelStrategyEnum;
import lombok.Data;
@Data
public class TFModelConfiguration {
public static Integer strategy = ChooseTFModelStrategyEnum.B.getStrategy();
}
package cn.com.duiba.spring.boot.starter.dsp.model.enums;
public enum AlgoTFModelStatusEnum {
RUNNING(1, "运行中"),
STOP(2, "停止运行");
private final Integer code;
private final String description;
AlgoTFModelStatusEnum(Integer code, String description) {
this.code = code;
this.description = description;
}
public Integer getCode() {
return code;
}
public String getDescription() {
return description;
}
}
package cn.com.duiba.spring.boot.starter.dsp.model.enums;
public enum ChooseTFModelStrategyEnum {
A(1, "AAA"),
B(2, "BBB"),
C(3, "CCC"),
SMOOTH_STRATEGY(4, "固定速率策略"),
WARMING_UP_STRATEGY(5, "预热策略"),
RANDOM_STRATEGY(6, "随机策略"),
WEIGHT_RANDOM_STRATEGY(7, "权重随机策略"),
;
private Integer strategy;
private String description;
public Integer getStrategy() {
return strategy;
}
ChooseTFModelStrategyEnum(Integer strategy, String description) {
this.strategy = strategy;
this.description = description;
}
}
package cn.com.duiba.spring.boot.starter.dsp.model.model;
import cn.com.duiba.nezha.alg.model.tf.LocalTFModel;
import cn.com.duiba.spring.boot.starter.dsp.model.enums.AlgoTFModelStatusEnum;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
@Slf4j
@Data
public class AlgoTFModel {
private final static Long CLOSE_TF_MODEL_TIME = 120000L;
// tf模型
private LocalTFModel localTFModel;
// 最后一次访问的时间
private Long lastAccessTime;
// tf模型被加载的时间
private Long loadTime;
// 模型的状态
private int status;
private String name;
public AlgoTFModel(int status, String name){
this.status = status;
this.name = name;
}
public AlgoTFModel(LocalTFModel localTFModel, int status, String name){
this.localTFModel = localTFModel;
this.status = status;
this.loadTime = System.currentTimeMillis();
this.name = name;
}
public LocalTFModel getLocalTFModel(){
this.lastAccessTime = System.currentTimeMillis();
//CatTools.metricForCount("获取tf模型, name:" + name);
return localTFModel;
}
/**
* 判断当前tf模型是否为running状态
* @return boolean
*/
public boolean isRunning() {
return this.status == AlgoTFModelStatusEnum.RUNNING.getCode();
}
/**
* 判断当前tf模型是否可以关闭
* @return
*/
public boolean isAllowClose() {
return lastAccessTime != null && System.currentTimeMillis() - lastAccessTime > CLOSE_TF_MODEL_TIME;
}
public void doCloseTFModel() {
try {
localTFModel.close();
log.info("关闭tf模型, name:{}", name);
} catch (Exception e) {
log.warn("AlgoTFModel doClose error", e);
return;
}
localTFModel = null;
status = AlgoTFModelStatusEnum.STOP.getCode();
lastAccessTime = null;
loadTime = null;
}
public boolean isLatestVersion(LocalTFModel localTFModel, String tfKey) {
try {
Long lastVersion = localTFModel.getLastVersion(tfKey);
return !Objects.equals(String.valueOf(lastVersion), this.localTFModel.getVersion());
} catch (Exception e) {
log.warn("AlgoTFModel getLastVersion error", e);
}
return false;
}
public void loadTFModel(LocalTFModel localTFModel, String tfKey) {
try {
localTFModel.loadModel(tfKey);
log.info("加载tf模型, name:{}", name);
} catch (Exception e) {
log.warn("AlgoTFModel loadModel error", e);
return;
}
status = AlgoTFModelStatusEnum.RUNNING.getCode();
loadTime = System.currentTimeMillis();
this.localTFModel = localTFModel;
}
}
package cn.com.duiba.spring.boot.starter.dsp.model.service;
import cn.com.duiba.nezha.alg.model.tf.LocalTFModel;
public interface AlgoTFModelFactory {
/**
* 获取tf模型的入口
* @param tfKey
* @return
*/
LocalTFModel getTFModel(String tfKey);
}
package cn.com.duiba.spring.boot.starter.dsp.model.service;
import cn.com.duiba.nezha.alg.model.tf.LocalTFModel;
public interface AlgoTFModelProxy {
/**
* 获取tf模型
* @return LocalTFModel
*/
LocalTFModel chooseTFModel();
/**
* 关闭tf模型
*/
void closeTFModel();
/**
* 更新tf模型
*/
boolean updateTFModel();
/**
* 是否可以更新模型
* @return boolean
*/
boolean hasTwoRunningModel();
boolean needFlush();
}
package cn.com.duiba.spring.boot.starter.dsp.model.service.impl;
import cn.com.duiba.nezha.alg.model.tf.LocalTFModel;
import cn.com.duiba.spring.boot.starter.dsp.model.service.AlgoTFModelFactory;
import cn.com.duiba.spring.boot.starter.dsp.model.service.AlgoTFModelProxy;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.MapUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Component
public class AlgoTFModelFactoryImpl implements AlgoTFModelFactory {
private final AtomicInteger atomicInteger = new AtomicInteger(0);
// 使用了享元模式,防止大量同一对象的创建,消耗大量内存空间
private final Map<String, AlgoTFModelProxy> proxyMap = new ConcurrentHashMap<>();
@Override
public LocalTFModel getTFModel(String tfKey) {
if (proxyMap.containsKey(tfKey)) {
AlgoTFModelProxy algoTFModelProxy = proxyMap.get(tfKey);
return algoTFModelProxy.chooseTFModel();
}
if (atomicInteger.compareAndSet(0, 1)) {
AlgoTFModelProxy proxy;
try {
proxy = new AlgoTFModelProxyImpl(tfKey);
proxyMap.putIfAbsent(tfKey, proxy);
} catch (Exception e) {
log.warn("AlgoTFModelProxy init error", e);
atomicInteger.set(0);
return null;
}
atomicInteger.set(0);
return proxy.chooseTFModel();
}
return null;
}
@Scheduled(fixedDelay = 2 * 60 * 1000)
void updateTFModelTask() {
if (MapUtils.isEmpty(proxyMap)) {
return;
}
closeTFModels();
if (isExistLoadingTFModels()) {
return;
}
updateTFModels();
}
/**
* 关闭TF模型:关闭超过2分钟没有被访问的tf模型
*/
private void closeTFModels() {
for (Map.Entry<String, AlgoTFModelProxy> entry : proxyMap.entrySet()) {
AlgoTFModelProxy algoTFModelProxy = entry.getValue();
if (Objects.isNull(algoTFModelProxy)) {
continue;
}
algoTFModelProxy.closeTFModel();
if (algoTFModelProxy.needFlush()) {
proxyMap.remove(entry.getKey());
}
}
}
private boolean isExistLoadingTFModels() {
for (AlgoTFModelProxy algoTFModelProxy : proxyMap.values()) {
if (Objects.isNull(algoTFModelProxy)) {
continue;
}
if (algoTFModelProxy.hasTwoRunningModel()){
return true;
}
}
return false;
}
/**
* 更新tf模型
*/
private void updateTFModels() {
List<AlgoTFModelProxy> algoTFModelProxies = new ArrayList<>(proxyMap.values());
algoTFModelProxies.sort(Comparator.comparingLong(x -> ((AlgoTFModelProxyImpl) x).getTfModelUpdateTime()));
for (AlgoTFModelProxy algoTFModelProxy : algoTFModelProxies) {
if (Objects.isNull(algoTFModelProxy)) {
continue;
}
if (algoTFModelProxy.updateTFModel()) {
return;
}
}
}
}
package cn.com.duiba.spring.boot.starter.dsp.model.service.impl;
import cn.com.duiba.nezha.alg.model.tf.LocalTFModel;
import cn.com.duiba.spring.boot.starter.dsp.model.enums.AlgoTFModelStatusEnum;
import cn.com.duiba.spring.boot.starter.dsp.model.model.AlgoTFModel;
import cn.com.duiba.spring.boot.starter.dsp.model.service.AlgoTFModelProxy;
import cn.com.duiba.spring.boot.starter.dsp.model.support.ChooseTFModelStrategy;
import cn.com.duiba.spring.boot.starter.dsp.model.support.StrategyFactory;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.Random;
@Slf4j
@Data
public class AlgoTFModelProxyImpl implements AlgoTFModelProxy {
private static Random random = new Random();
// 模型key
private String tfKey;
// 模型1
private AlgoTFModel algoTFModel1;
// 模型2
private AlgoTFModel algoTFModel2;
// 模型更新时间
private long tfModelUpdateTime;
AlgoTFModelProxyImpl(String tfKey) throws Exception {
this.tfKey = tfKey;
LocalTFModel localTFModel = new LocalTFModel();
try {
localTFModel.loadModel(tfKey);
} catch (Exception e) {
log.warn("localTFModel loadModel error...", e);
throw e;
}
// 初始化投入生产的模型
algoTFModel1 = new AlgoTFModel(localTFModel, AlgoTFModelStatusEnum.RUNNING.getCode(), tfKey + "-A");
// 初始化准备投入生产的模型
algoTFModel2 = new AlgoTFModel(AlgoTFModelStatusEnum.STOP.getCode(), tfKey + "-B");
}
@Override
public LocalTFModel chooseTFModel() {
// case1
if (!algoTFModel1.isRunning() && !algoTFModel2.isRunning()) {
return null;
}
// case2
if (algoTFModel1.isRunning() && !algoTFModel2.isRunning()) {
return algoTFModel1.getLocalTFModel();
}
// case3
if (!algoTFModel1.isRunning() && algoTFModel2.isRunning()) {
return algoTFModel2.getLocalTFModel();
}
AlgoTFModel newTFModel;
AlgoTFModel oldTFModel;
if (algoTFModel1.getLoadTime().compareTo(algoTFModel2.getLoadTime()) > 0) {
newTFModel = algoTFModel1;
oldTFModel = algoTFModel2;
} else {
newTFModel = algoTFModel2;
oldTFModel = algoTFModel1;
}
Long loadTime = newTFModel.getLoadTime();
if (System.currentTimeMillis() - loadTime >= 90000) {
return newTFModel.getLocalTFModel();
}
ChooseTFModelStrategy strategy = StrategyFactory.getStrategy();
if (strategy.tryAcquire(loadTime)) {
return newTFModel.getLocalTFModel();
}
return oldTFModel.getLocalTFModel();
}
@Override
public void closeTFModel() {
boolean allowClose1 = algoTFModel1.isAllowClose();
boolean allowClose2 = algoTFModel2.isAllowClose();
// 如果模型1超过5分钟没有被访问,进行关闭
if (allowClose1) {
algoTFModel1.doCloseTFModel();
}
// 如果模型2超过5分钟没有被访问,进行关闭
if (allowClose2) {
algoTFModel2.doCloseTFModel();
}
}
@Override
public boolean updateTFModel() {
// case1:模型1停止运行,并且模型2停止运行
// 则代表此key对应的模型已经停止使用了, 不做处理,继续下一个循环逻辑
if (!algoTFModel1.isRunning() && !algoTFModel2.isRunning()) {
return false;
}
// case2:模型1正在运行,模型2停止运行
// 则立刻加载新模型到模型2上
LocalTFModel localTFModel = new LocalTFModel();
if (algoTFModel1.isRunning() && !algoTFModel2.isRunning() && algoTFModel1.isLatestVersion(localTFModel, tfKey)) {
algoTFModel2.loadTFModel(localTFModel, tfKey);
tfModelUpdateTime = System.currentTimeMillis();
return true;
}
// case3:模型1停止运行,模型2正在运行
// 则立刻加载新模型到模型1上
if (!algoTFModel1.isRunning() && algoTFModel2.isRunning() && algoTFModel2.isLatestVersion(localTFModel, tfKey)) {
algoTFModel1.loadTFModel(localTFModel, tfKey);
tfModelUpdateTime = System.currentTimeMillis();
return true;
}
return false;
}
@Override
public boolean hasTwoRunningModel() {
return algoTFModel1.isRunning() && algoTFModel2.isRunning();
}
@Override
public boolean needFlush() {
return !algoTFModel1.isRunning() && !algoTFModel2.isRunning();
}
}
package cn.com.duiba.spring.boot.starter.dsp.model.support;
import java.util.Random;
/**
* 1.随机
* 2.权重随机
* 3.
* 4.固定速率
* 5.预热模式
* 6.全部使用新模型
*/
public abstract class ChooseTFModelStrategy {
protected static Random random = new Random();
public abstract boolean tryAcquire(Long loadTime);
protected int getWeight(Long loadTime) {
return (int) ((System.currentTimeMillis() - loadTime) / 900);
}
}
package cn.com.duiba.spring.boot.starter.dsp.model.support;
public class RandomStrategy extends ChooseTFModelStrategy {
@Override
public boolean tryAcquire(Long loadTime) {
return random.nextInt(2) == 1;
}
}
package cn.com.duiba.spring.boot.starter.dsp.model.support;
import com.google.common.util.concurrent.RateLimiter;
/**
* 固定速率增加预热模型的数量
*/
public class SmoothStrategy extends ChooseTFModelStrategy {
private RateLimiter limiter = RateLimiter.create(1);
@Override
public boolean tryAcquire(Long loadTime) {
return limiter.tryAcquire();
}
}
package cn.com.duiba.spring.boot.starter.dsp.model.support;
/**
* 慢慢的增加
*/
public class StrategyA extends ChooseTFModelStrategy {
@Override
public boolean tryAcquire(Long loadTime) {
double v = random.nextDouble() * 100;
double sqrt = Math.sqrt(getWeight(loadTime));
return v <= sqrt;
}
}
package cn.com.duiba.spring.boot.starter.dsp.model.support;
public class StrategyB extends ChooseTFModelStrategy {
@Override
public boolean tryAcquire(Long loadTime) {
double v = random.nextDouble() * 100;
double sqrt = Math.cbrt(getWeight(loadTime));
return v <= sqrt;
}
}
package cn.com.duiba.spring.boot.starter.dsp.model.support;
public class StrategyC extends ChooseTFModelStrategy {
@Override
public boolean tryAcquire(Long loadTime) {
long deltaTime = System.currentTimeMillis() - loadTime;
double v = (double)(deltaTime / 10000);
return random.nextDouble() * 100 <= Math.sqrt(v);
}
}
package cn.com.duiba.spring.boot.starter.dsp.model.support;
import cn.com.duiba.spring.boot.starter.dsp.model.config.TFModelConfiguration;
import cn.com.duiba.spring.boot.starter.dsp.model.enums.ChooseTFModelStrategyEnum;
import java.util.HashMap;
import java.util.Map;
public class StrategyFactory {
private static final Map<Integer, ChooseTFModelStrategy> strategyMap = new HashMap<>();
static {
strategyMap.put(ChooseTFModelStrategyEnum.A.getStrategy(), new StrategyA());
strategyMap.put(ChooseTFModelStrategyEnum.B.getStrategy(), new StrategyB());
strategyMap.put(ChooseTFModelStrategyEnum.C.getStrategy(), new StrategyC());
strategyMap.put(ChooseTFModelStrategyEnum.SMOOTH_STRATEGY.getStrategy(), new SmoothStrategy());
strategyMap.put(ChooseTFModelStrategyEnum.WARMING_UP_STRATEGY.getStrategy(), new WarmingUpStrategy());
strategyMap.put(ChooseTFModelStrategyEnum.RANDOM_STRATEGY.getStrategy(), new RandomStrategy());
strategyMap.put(ChooseTFModelStrategyEnum.WEIGHT_RANDOM_STRATEGY.getStrategy(), new WeightRandomStrategy());
}
public static ChooseTFModelStrategy getStrategy() {
Integer strategy = TFModelConfiguration.strategy;
if (!strategyMap.containsKey(strategy)) {
return strategyMap.get(ChooseTFModelStrategyEnum.B.getStrategy());
}
return strategyMap.get(strategy);
}
}
package cn.com.duiba.spring.boot.starter.dsp.model.support;
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.TimeUnit;
/**
* 匀速增加预热模型的数量
*/
public class WarmingUpStrategy extends ChooseTFModelStrategy {
private RateLimiter limiter = RateLimiter.create(10, 2, TimeUnit.SECONDS);
@Override
public boolean tryAcquire(Long loadTime) {
return limiter.tryAcquire();
}
}
package cn.com.duiba.spring.boot.starter.dsp.model.support;
public class WeightRandomStrategy extends ChooseTFModelStrategy {
@Override
public boolean tryAcquire(Long loadTime) {
return random.nextInt(100) <= getWeight(loadTime);
}
}
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testCompileOnly 'org.projectlombok:lombok'
testAnnotationProcessor 'org.projectlombok:lombok'
compile "org.springframework.boot:spring-boot-starter-web"
compile "org.springframework.cloud:spring-cloud-starter-openfeign"
compile 'org.springframework.boot:spring-boot-autoconfigure'
compile "org.apache.dubbo:dubbo-spring-boot-starter"
compile "com.alibaba:transmittable-thread-local"
compile "org.apache.commons:commons-lang3"
}
package cn.com.duiba.spring.boot.starter.dsp.sampler;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Objects;
@Slf4j
@Component
public class CustomRequestInterceptor implements RequestInterceptor {
@Override
public void apply(RequestTemplate template) {
SamplerLogContext logSamplerTracer = SamplerLogThreadLocal.getContext().get();
if (Objects.isNull(logSamplerTracer) || Objects.isNull(logSamplerTracer.getPrintLogFlag())) {
return;
}
if (logSamplerTracer.getPrintLogFlag()) {
template.header(SamplerLogConstant.RPC_SAMPLING_ID, logSamplerTracer.getSamplingId());
}
template.header(SamplerLogConstant.RPC_PRINT_LOG_FLAG, logSamplerTracer.getPrintLogFlag().toString());
}
}
package cn.com.duiba.spring.boot.starter.dsp.sampler;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.*;
import java.util.Objects;
@Slf4j
@Activate(group = {CommonConstants.CONSUMER, CommonConstants.PROVIDER}, order = -2000)
public class DubboLogSamplerContextFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String sideKey = url.getParameter("side");
boolean isConsumerSide = Objects.equals(sideKey, CommonConstants.CONSUMER);
// 1、判断是消费者还是服务提供者
if (isConsumerSide) {
// 如果是消费者,将打印日志标识符set至上下文中
RpcContext.getContext().setAttachment(SamplerLogConstant.DUBBO_PRINT_LOG_FLAG, String.valueOf(SamplerLogThreadLocal.getContext().get().getPrintLogFlag()));
RpcContext.getContext().setAttachment(SamplerLogConstant.DUBBO_SAMPLING_ID, String.valueOf(SamplerLogThreadLocal.getContext().get().getSamplingId()));
} else {
// 如果是服务提供者,从上下文取出
String traceContext = RpcContext.getContext().getAttachment(SamplerLogConstant.DUBBO_PRINT_LOG_FLAG);
String traceId = RpcContext.getContext().getAttachment(SamplerLogConstant.DUBBO_SAMPLING_ID);
SamplerLogThreadLocal.set(Objects.equals("true", traceContext), traceId);
}
try {
return invoker.invoke(invocation);
} catch (Exception e) {
if (!(e instanceof RpcException)) {
log.warn("DubboLogSamplerContextFilter invoke warn", e);
}
}
return null;
}
}
package cn.com.duiba.spring.boot.starter.dsp.sampler;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.springframework.cloud.openfeign.FeignClient;
import java.util.Objects;
@Slf4j
@Configuration
@ConditionalOnClass({FeignClient.class})
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.SERVLET)
public class DuibaRpcContextParamsInterceptor extends HandlerInterceptorAdapter {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
String printLogFlag = request.getHeader(SamplerLogConstant.RPC_PRINT_LOG_FLAG);
String adxTraceId = request.getHeader(SamplerLogConstant.RPC_SAMPLING_ID);
if (StringUtils.isBlank(printLogFlag)) {
return true;
}
SamplerLogThreadLocal.set(Objects.equals("true", printLogFlag), adxTraceId);
return true;
}
}
package cn.com.duiba.spring.boot.starter.dsp.sampler;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.Ordered;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import javax.annotation.Resource;
@Configuration
@ConditionalOnClass({FeignClient.class})
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.SERVLET)
@Import(DuibaRpcContextParamsInterceptor.class)
public class RpcContextParamsInterceptorConfig {
@Bean
public InterceptorRegisterConfig interceptorRegister(){
return new InterceptorRegisterConfig();
}
static class InterceptorRegisterConfig implements WebMvcConfigurer, Ordered {
@Resource
private DuibaRpcContextParamsInterceptor interceptor;
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(interceptor).addPathPatterns("/**");
}
@Override
public int getOrder() {
return -100;
}
}
}
\ No newline at end of file
package cn.com.duiba.spring.boot.starter.dsp.sampler;
import com.alibaba.ttl.TransmittableThreadLocal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
public class SamplerLog {
private static final Logger logger = LoggerFactory.getLogger(SamplerLog.class);
public static void startSampling(Integer samplingRate){
SamplerLogThreadLocal.setSamplingRate(samplingRate);
}
public static boolean infoFlag() {
TransmittableThreadLocal<SamplerLogContext> threadLocal = SamplerLogThreadLocal.getContext();
if (Objects.isNull(threadLocal) || Objects.isNull(threadLocal.get()) || Objects.isNull(threadLocal.get().getPrintLogFlag())) {
return false;
}
return threadLocal.get().getPrintLogFlag();
}
public static void info(String format, Object... arguments) {
if (infoFlag()) {
logger.info("samplingId-" + SamplerLogThreadLocal.getContext().get().getSamplingId() + "," + format, arguments);
}
}
}
package cn.com.duiba.spring.boot.starter.dsp.sampler;
public class SamplerLogConstant {
public static String RPC_PRINT_LOG_FLAG = "rpcPrintLogFlag";
public static String DUBBO_PRINT_LOG_FLAG = "dubboPrintLogFlag";
public static String RPC_SAMPLING_ID = "rpcSamplingId";
public static String DUBBO_SAMPLING_ID = "dubboSamplingId";
}
package cn.com.duiba.spring.boot.starter.dsp.sampler;
import lombok.Data;
@Data
public class SamplerLogContext {
/**
* 日志采样标识符,是否需要打印日志
* true:打印日志;false:不打印日志
*/
private Boolean printLogFlag;
/**
* 日志采样id
*/
private String samplingId;
}
package cn.com.duiba.spring.boot.starter.dsp.sampler;
import com.alibaba.ttl.TransmittableThreadLocal;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
public class SamplerLogThreadLocal {
private static final Random random = new Random();
private static final TransmittableThreadLocal<SamplerLogContext> logSamplerTracerThreadLocal = new TransmittableThreadLocal<>();
static TransmittableThreadLocal<SamplerLogContext> getContext() {
return logSamplerTracerThreadLocal;
}
static void setSamplingRate(Integer samplingRate) {
if (Objects.isNull(samplingRate)) {
set(false, null);
return;
}
if (random.nextInt(samplingRate) == 0) {
set(true, UUID.randomUUID().toString());
return;
}
set(false, null);
}
static void set(boolean isPrint, String samplingId) {
SamplerLogContext logSamplerContext = new SamplerLogContext();
logSamplerContext.setPrintLogFlag(isPrint);
logSamplerContext.setSamplingId(samplingId);
logSamplerTracerThreadLocal.set(logSamplerContext);
}
}
dubboLogSamplerContextFilter=cn.com.duiba.spring.boot.starter.dsp.sampler.DubboLogSamplerContextFilter
org.springframework.boot.autoconfigure.EnableAutoConfiguration=cn.com.duiba.spring.boot.starter.dsp.sampler.RpcContextParamsInterceptorConfig
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment