spring boot内实现流式代理
# spring boot内实现流式代理
所谓流式,对于web前端来说就是event stream,对于Android来说就是用stream来接收响应体,retrofit上加@Streaming 注解.
从代理转发的角度上来看,就是一边接收,一边发送到下一端
nginx代理服务器本身就支持流式代理转发的功能
但是spring boot对流式的支持不好
比如使用的spring cloud gateway mvc库的ProxyExchange类, 就不支持流式.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gateway-mvc</artifactId>
<version>3.1.6</version>
</dependency>
1
2
3
4
5
2
3
4
5
使用这个库做普通转发,一般是接收完成后,然后发到下一端:
@PostMapping("/app-api/proxy/**")
public ResponseEntity<String> proxyPath(ProxyExchange<String> proxy) throws Exception {
String path = proxy.path("/app-api/proxy/");
return proxy.uri("https://77777.8888.top/" + path)
.header("host","")
.post();
}
1
2
3
4
5
6
7
2
3
4
5
6
7
要支持流式,还得用相对更底层一些的库,比如直接用okhttp:
private OkHttpClient okHttpClient;
@PostMapping(value = "/app-api/proxyStream/**")
@PreAuthenticated
public void proxy(HttpServletRequest request,
@RequestHeader Map<String, String> requestHeaders,
HttpServletResponse response ) throws IOException {
String url = request.getRequestURI();
String path = url.substring(url.indexOf("/app-api/proxyStream/")+"/app-api/proxyStream/".length());
StaticLog.debug("headers---> "+requestHeaders.toString());
url = "http://yyyyyyyy/"+ path;
StaticLog.debug("path---> "+path);
requestHeaders.remove("host");
requestHeaders.remove("referer");
requestHeaders.remove("Origin");
requestHeaders.remove("origin");
requestHeaders.remove("Authorization");
requestHeaders.remove("authorization");
Iterator<String> iterator = requestHeaders.keySet().iterator();
while (iterator.hasNext()){
String next = iterator.next();
if(next.startsWith("sec-")){
iterator.remove();
}
}
StaticLog.info("headers after remove 0---> "+requestHeaders.toString());
requestHeaders.put("host","yyy.uuuu.iiiii");
requestHeaders.put("Authorization","Bearer xxxxxx");
StaticLog.info("headers after remove---> "+requestHeaders.toString());
proxy(request, requestHeaders, response, url);
}
private void proxy(HttpServletRequest request,
Map<String, String> requestHeaders,
HttpServletResponse response, String url) throws IOException {
String queryString = request.getQueryString();
if (queryString != null) {
url += "?" + queryString;
}
//if(request.)
RequestBody requestBody = null;
InputStream inputStream0 = request.getInputStream();
if(inputStream0 != null){
String type = request.getHeader("content-type");
if(type ==null || type.isEmpty()){
type = "application/octet-stream";
}
requestBody = new InputStreamRequestBody(MediaType.parse(type),inputStream0);
//TODO inputStream.readAllBytes()是非stream操作, 如果是大文件上传,就会有性能问题和内存问题
/* byte[] bytes = inputStream.readAllBytes();
requestBody = RequestBody.create(bytes);*/
}
//todo 加上bare token
Request okHttpRequest = new Request.Builder()
.url(url)
.method(request.getMethod(),requestBody)
//.post(requestBody)
.headers(Headers.of(requestHeaders))
.build();
try (Response okHttpResponse = okHttpClient.newCall(okHttpRequest).execute()) {
// set response status code
response.setStatus(okHttpResponse.code());
// pass response headers
for (Map.Entry<String, List<String>> entry : okHttpResponse.headers().toMultimap().entrySet()) {
String key = entry.getKey();
List<String> value = entry.getValue();
////access-control-allow-origin: * : 会与外部重复,导致跨域问题,所以需要移除
//todo
if(!key.toLowerCase().startsWith("access-control")){
response.addHeader(key, value.get(0));
}
}
if(!okHttpResponse.isSuccessful()){
//todo
}
if(okHttpResponse.body() ==null){
return;
}
// set response content type
if( okHttpResponse.body().contentType() != null ){
response.setContentType(okHttpResponse.body().contentType().toString());
}
// streaming response body
try (InputStream inputStream = okHttpResponse.body().byteStream();
OutputStream outputStream = response.getOutputStream()) {
ByteArrayOutputStream outputStream2 = new ByteArrayOutputStream();
// 一个inputStream往两个outputStream里写
OutputStream teeOutputStream = new TeeOutputStream(outputStream, outputStream2);
byte[] buffer = new byte[512];
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
teeOutputStream.write(buffer, 0, bytesRead);
}
teeOutputStream.flush();
teeOutputStream.close();
byte[] bytes = outputStream2.toByteArray();
outputStream2.close();
String json = new String(bytes,0,bytes.length,"UTF-8");
//这里构建一个不影响流式响应的拦截/日志功能
StaticLog.info(json);
}
//ClassCastException: class java.util.LinkedHashMap cannot be cast to class [B (java.util.LinkedHashMap and
// [B are in module java.base of loader 'bootstrap')(String), java.lang.ClassCastException:
// class java.util.LinkedHashMap cannot be cast to class [B (java.util.LinkedHashMap and [B are in module java.base of loader 'bootstrap')
}catch (Throwable throwable){
StaticLog.error(throwable);
throw throwable;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
编辑 (opens new window)
上次更新: 2024/02/18, 17:27:05