package com.se.nsl.utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; /** * 自定义 Web 客户端 * * @author xingjinshuang@smartearth.cn * @date 2024/06/27 */ public class CustomWebClient { private static final Logger logger = LoggerFactory.getLogger(CustomWebClient.class); private static final WebClient webClient; static { // 在静态代码块中实例化WebClient.Builder并构建WebClient对象 webClient = WebClient.builder().build(); } //====基础请求===================================================================================================================================== public static Mono get(String url) { return webClient.get() .uri(url) .retrieve() .bodyToMono(String.class); } public static Mono post(String url, Object requestBody) { return webClient.post() .uri(url) .bodyValue(requestBody) .retrieve() .bodyToMono(String.class); } public static Mono postForm(String url, MultiValueMap formData) { return webClient.post() .uri(url) .contentType(MediaType.APPLICATION_FORM_URLENCODED) .body(BodyInserters.fromFormData(formData)) .retrieve() .bodyToMono(String.class); } public static Mono put(String url, Object requestBody) { return webClient.put() .uri(url) .bodyValue(requestBody) .retrieve() .bodyToMono(String.class); } // 发送PUT请求 public static Mono put(String url, R requestBody, Class responseType) { return webClient.put() .uri(url) .contentType(MediaType.APPLICATION_JSON) .body(BodyInserters.fromValue(requestBody)) .retrieve() .bodyToMono(responseType); } public static Mono delete(String url) { return webClient.delete() .uri(url) .retrieve() .onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new CustomWebClientException1("Client error: " + clientResponse.statusCode()))) .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new CustomWebClientException1("Server error: " + clientResponse.statusCode()))) .bodyToMono(String.class); } //=======自定义返回类型的请求=================================================================================================================================== public static Mono getAndParse(String url, Class responseType) { return webClient.get() .uri(url) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(responseType); } public static Mono postAndParse(String url, R requestBody, Class responseType) { return webClient.post() .uri(url) .contentType(MediaType.APPLICATION_JSON) .body(BodyInserters.fromValue(requestBody)) .retrieve() .bodyToMono(responseType); } /** * toBodilessEntity下载或者删除的时候不需要对响应体处理的 * toBodilessEntity() 是 Java Play Framework 中 WebClient 类的一个方法,它用于将响应转换为没有体的响应实体。 * 通常,我们在处理不需要读取响应体的大型下载时,可以使用这个方法来避免不必要的内存占用。 */ public static Mono postAndReceiveLocation(String url, Object requestBody) { return webClient.post() .uri(url) .bodyValue(requestBody) .retrieve() .toBodilessEntity() .flatMap(response -> { if (response.getHeaders().getLocation() != null) { return Mono.just(response.getHeaders().getLocation().toString()); } else { // 如果Location为空,返回response return Mono.just(response.getStatusCode().toString()); } }); } public static Mono postFormAndReceiveLocation(String url, MultiValueMap formData) { return webClient.post() .uri(url) .contentType(MediaType.APPLICATION_FORM_URLENCODED) .body(BodyInserters.fromFormData(formData)) .retrieve() .toBodilessEntity() .flatMap(response -> { if (response.getHeaders().getLocation() != null) { return Mono.just(response.getHeaders().getLocation().toString()); } else { // 如果Location为空,返回response的状态码 return Mono.just(response.getStatusCode().toString()); } }); } //========================================================================================================================================== // 异步请求的GET、POST方式 public static Mono getAsMono(String path, Class responseType) { return webClient.get() .uri(path) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(responseType); } public static Mono postAsMono(String path, Object requestBody, Class responseType) { return webClient.post() .uri(path) .contentType(MediaType.APPLICATION_JSON) .body(Mono.just(requestBody), requestBody.getClass()) .retrieve() .bodyToMono(responseType); } public static CompletableFuture getAsFuture(String path, Class responseType) { return webClient .method(HttpMethod.GET) .uri(path) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(responseType) .toFuture(); } public static CompletableFuture postAsFuture(String path, Object requestBody, HashMap headers, Class responseType) { return webClient .method(HttpMethod.POST) .uri(path) .contentType(MediaType.APPLICATION_JSON) .headers(h -> headers.forEach(h::add)) .bodyValue(requestBody) .retrieve() .bodyToMono(responseType) .toFuture(); } //========================================================================================================================================== // 其他方式一发起请求 private static final WebClient WEB_CLIENT = WebClient.create(); /** * 发起GET请求,支持Get parameter */ public static CompletableFuture getParam(String url, HttpHeaders headers, MultiValueMap queryParams) { return Mono.from(WEB_CLIENT.get() .uri(uriBuilder -> uriBuilder .path(url) .queryParams(queryParams) .build()) .headers(httpHeaders -> httpHeaders.putAll(headers)) //.headers(h -> headers.forEach(h::add)) .retrieve() .onStatus(HttpStatus::isError, clientResponse -> Mono.error(new RuntimeException("HTTP error status: " + clientResponse.statusCode()))) .bodyToMono(String.class)) .onErrorResume(error -> Mono.just("Error: " + error.getMessage())) // 如果有错误,返回错误信息 .toFuture(); } /** * 发起GET请求,支持Get parameter * 可以用 */ public static CompletableFuture getNoParam(String url, HttpHeaders headers) { return Mono.from(WEB_CLIENT.get() .uri(url) .headers(httpHeaders -> httpHeaders.putAll(headers)) //.headers(h -> headers.forEach(h::add)) .retrieve() .onStatus(HttpStatus::isError, clientResponse -> Mono.error(new RuntimeException("HTTP error status: " + clientResponse.statusCode()))) .bodyToMono(String.class)) .onErrorResume(error -> Mono.just("Error: " + error.getMessage())) // 如果有错误,返回错误信息 .toFuture(); } /** * 发起POST请求,支持JSON body */ public static CompletableFuture postJson(String url, Object body, HashMap headers) { return Mono.from(WEB_CLIENT.post() .uri(url) .contentType(MediaType.APPLICATION_JSON) .headers(h -> headers.forEach(h::add)) .bodyValue(body) .retrieve() .onStatus(HttpStatus::isError, clientResponse -> Mono.error(new RuntimeException("HTTP error status: " + clientResponse.statusCode()))) .bodyToMono(String.class)) .onErrorResume(error -> Mono.just("Error: " + error.getMessage())) // 如果有错误,返回错误信息 .toFuture(); } /** * 发起POST请求,支持表单数据 */ public static CompletableFuture postForm(String url, MultiValueMap formData, Map headers) { return Mono.from(WEB_CLIENT.post() .uri(url) .headers(h -> headers.forEach(h::add)) .contentType(MediaType.APPLICATION_FORM_URLENCODED) .body(BodyInserters.fromFormData(formData)) .retrieve() .bodyToMono(String.class)) .toFuture(); } //========================================================================================================================================= // 其他请求方式二 public Mono getRequest(String url, long timeoutSeconds, int retryCount) { return executeRequest(url, HttpMethod.GET, null, HttpHeaders.EMPTY, timeoutSeconds, retryCount); } public Mono postRequest(String url, Object requestBody, HttpHeaders headers, long timeoutSeconds, int retryCount) { return executeRequest(url, HttpMethod.POST, requestBody, headers, timeoutSeconds, retryCount); } /** * 执行请求 * * @param url 网址 * @param method 方法 * @param requestBody 请求正文 * @param headers 头 * @param timeoutSeconds 超时秒数 * @param retryCount 重试计数 * @return {@link Mono}<{@link String}> */ private Mono executeRequest(String url, HttpMethod method, Object requestBody, HttpHeaders headers, long timeoutSeconds, int retryCount) { return executeRequestInternal(url, method, requestBody, headers, timeoutSeconds, retryCount) .onErrorResume(throwable -> { logger.error("Error during request: {}", throwable.getMessage()); return Mono.error(throwable); }); } /** * 内部执行请求 * * @param url 网址 * @param method 方法 * @param requestBody 请求正文 * @param headers 头 * @param timeoutSeconds 超时秒数 * @param retryCount 重试计数 * @return {@link Mono}<{@link String}> */ private Mono executeRequestInternal(String url, HttpMethod method, Object requestBody, HttpHeaders headers, long timeoutSeconds, int retryCount) { return webClient.method(method) .uri(url) .headers(httpHeaders -> httpHeaders.addAll(headers)) .bodyValue(requestBody) .retrieve() .bodyToMono(String.class) .timeout(Duration.ofSeconds(timeoutSeconds)) .doOnError(error -> logger.error("Error during request: {}", error)) .retry(retryCount); } //========================================================================================================================================= // 其他参数 /** * 使用超时获取 * * @param url 端点 * @param timeoutSeconds 超时秒数 * @return {@link Mono}<{@link String}> */ public Mono getWithTimeout(String url, long timeoutSeconds) { return webClient.get() .uri(url) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(String.class) .timeout(Duration.ofSeconds(timeoutSeconds)) .onErrorMap(error -> new CustomWebClientException("Request timeout", error)); } /** * 带标题帖子 * * @param url 端点 * @param requestBody 请求正文 * @param headers 头 * @return {@link Mono}<{@link String}> */ public Mono postWithHeaders(String url, Object requestBody, HttpHeaders headers) { return webClient.post() .uri(url) .bodyValue(requestBody) .headers(httpHeaders -> httpHeaders.addAll(headers)) .retrieve() .bodyToMono(String.class); } /** * 获取并发 * * @param endpoint1 端点 1 * @param endpoint2 端点 2 * @return {@link Mono}<{@link String}> */ public Mono getConcurrently(String endpoint1, String endpoint2) { Mono result1 = webClient.get() .uri(endpoint1) .retrieve() .bodyToMono(String.class); Mono result2 = webClient.get() .uri(endpoint2) .retrieve() .bodyToMono(String.class); return result1.zipWith(result2).map(tuple -> tuple.getT1() + tuple.getT2()); } //========================================================================================================================================== /** * 自定义 Web 客户端异常 * Custom exception class for WebClient error handling * * @author xingjinshuang@smartearth.cn * @date 2024/06/27 */ public static class CustomWebClientException1 extends RuntimeException { public CustomWebClientException1(String message) { super(message); } } /** * 自定义 Web 客户端异常 * * @author xingjinshuang * @date 2024/06/27 */ public static class CustomWebClientException extends RuntimeException { public CustomWebClientException(String message, Throwable cause) { super(message, cause); } } /** * 主要 * * @param args 参数 */ public static void main(String[] args) { HashMap headers = new HashMap<>(); headers.put("Content-Type", "application/json"); Mono res = getAndParse("https://api.example.com/data", String.class); res.subscribe(dataResponse -> { // 处理数据响应 System.out.println("Received data response: " + dataResponse); }); Mono res1 = get("https://api.example.com/textdata"); res1.subscribe(textData -> { // 处理文本数据响应 System.out.println("Received text data: " + textData); }); String requestBody00 = new String("test"); Mono res2 = postAndReceiveLocation("https://api.example.com/resource", requestBody00); res2.subscribe(location -> { // 处理返回的资源位置 System.out.println("Resource location: " + location); }); MultiValueMap formData = new LinkedMultiValueMap<>(); formData.add("key1", "value1"); formData.add("key2", "value2"); Mono res3 = postFormAndReceiveLocation("https://api.example.com/formsubmit", formData); res3.subscribe(location -> { // 处理返回的表单提交位置 System.out.println("Form submission location: " + location); }); // 异步GET请求,通过subscribe方法来处理响应 Mono asyncResponse0 = getAsMono("/api/resource", String.class); // asyncResponse0.subscribe(System.out::println); asyncResponse0.flatMap(response -> { System.out.println("GET请求结果:" + response); return Mono.just(response); }).subscribe(); // 异步POST请求,通过subscribe方法来处理响应 String requestBody0 = new String("data"); Mono asyncPostedResponse0 = postAsMono("/api/resource", requestBody0, String.class); // asyncPostedResponse0.subscribe(System.out::println); asyncPostedResponse0.flatMap(response -> { System.out.println("POST请求结果:" + response); return Mono.just(response); }).subscribe(); // 异步GET请求,不会直接返回返回体 CompletableFuture asyncResponse = getAsFuture("/api/resource", String.class); asyncResponse.thenAccept(response -> { System.out.println("GET请求结果:" + response); }); // 异步POST请求,不会直接返回返回体 String requestBody = new String("data"); CompletableFuture asyncPostedResponse = postAsFuture("/api/resource", requestBody, headers, String.class); asyncPostedResponse.thenAccept(response -> { System.out.println("POST请求结果:" + response); }); // henAccept方法是一个消费型的方法,它不会返回任何值。 // 要获取异步请求的返回值,可以使用thenApply方法,这个方法会返回一个新的CompletableFuture对象,里面包含经过处理后的返回值 // 异步GET请求,返回返回体 CompletableFuture asyncResponseRes = getAsFuture("/api/resource", String.class); asyncResponse.thenApply(response -> { System.out.println("GET请求结果:" + response); return response; }); // henAccept方法是一个消费型的方法,它不会返回任何值。 // 要获取异步请求的返回值,可以使用thenApply方法,这个方法会返回一个新的CompletableFuture对象,里面包含经过处理后的返回值 // 异步POST请求,返回返回体 String requestBody1 = new String("data"); CompletableFuture asyncPostedResponseRes = postAsFuture("/api/resource", requestBody1, headers, String.class); asyncPostedResponse.thenApply(response -> { System.out.println("POST请求结果:" + response); return response; }); // 同步方式下获取响应体,可以使用join方法来等待异步操作的完成并获取最终的结果。这样可以确保在获取结果之前阻塞当前线程,直到异步操作完成。 // 使用join方法来同步获取响应体: String requestBody2 = new String("data"); CompletableFuture asyncPostedResponse2 = CustomWebClient.postAsFuture("/api/resource", requestBody2, headers, String.class); asyncPostedResponse2.thenAccept(response -> { System.out.println("POST请求结果:" + response); }); String syncResponse = asyncPostedResponse2.join(); System.out.println("同步获取的响应体:" + syncResponse); // 防止主线程提前结束 try { // 等待异步请求完成 Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }