diff --git a/components/esp_websocket_client/CMakeLists.txt b/components/esp_websocket_client/CMakeLists.txt index 3e93b839a4..6f80d65b92 100644 --- a/components/esp_websocket_client/CMakeLists.txt +++ b/components/esp_websocket_client/CMakeLists.txt @@ -8,14 +8,20 @@ if(NOT CONFIG_WS_TRANSPORT AND NOT CMAKE_BUILD_EARLY_EXPANSION) return() endif() +set(PRIV_INCLUDE_LIST esp_timer) + +if(CONFIG_ESP_WS_CLIENT_ENABLE_COMPRESSION AND NOT CMAKE_BUILD_EARLY_EXPANSION) + set(PRIV_INCLUDE_LIST ${PRIV_INCLUDE_LIST} zlib) +endif () + if(${IDF_TARGET} STREQUAL "linux") idf_component_register(SRCS "esp_websocket_client.c" INCLUDE_DIRS "include" REQUIRES esp-tls tcp_transport http_parser esp_event nvs_flash esp_stubs json - PRIV_REQUIRES esp_timer) + PRIV_REQUIRES ${PRIV_INCLUDE_LIST}) else() idf_component_register(SRCS "esp_websocket_client.c" INCLUDE_DIRS "include" REQUIRES lwip esp-tls tcp_transport http_parser esp_event - PRIV_REQUIRES esp_timer) + PRIV_REQUIRES ${PRIV_INCLUDE_LIST}) endif() diff --git a/components/esp_websocket_client/Kconfig b/components/esp_websocket_client/Kconfig index 289c1725d8..76f8febfa8 100644 --- a/components/esp_websocket_client/Kconfig +++ b/components/esp_websocket_client/Kconfig @@ -20,4 +20,11 @@ menu "ESP WebSocket client" default 2000 help Timeout for acquiring the TX lock when using separate TX lock. + + config ESP_WS_CLIENT_ENABLE_COMPRESSION + bool "Enable per-message compression support (RFC7692)" + default n + help + Enable this option will build with zlib, and support RFC7692 per-message DEFLATE compression. + endmenu diff --git a/components/esp_websocket_client/esp_websocket_client.c b/components/esp_websocket_client/esp_websocket_client.c index 87c3a7e6fa..705133ba4e 100644 --- a/components/esp_websocket_client/esp_websocket_client.c +++ b/components/esp_websocket_client/esp_websocket_client.c @@ -23,6 +23,11 @@ #include #include +#ifdef CONFIG_ESP_WS_CLIENT_ENABLE_COMPRESSION +#include +#endif + + static const char *TAG = "websocket_client"; #define WEBSOCKET_TCP_DEFAULT_PORT (80) @@ -113,6 +118,14 @@ typedef struct { const char *cert_common_name; esp_err_t (*crt_bundle_attach)(void *conf); esp_transport_handle_t ext_transport; +#ifdef CONFIG_ESP_WS_CLIENT_ENABLE_COMPRESSION + bool per_msg_compress; + int per_msg_client_deflate_window_bit; + int per_msg_server_deflate_window_bit; + bool per_msg_server_no_ctx_takeover; + bool per_msg_client_no_ctx_takeover; + int per_msg_compress_level; +#endif } websocket_config_storage_t; typedef enum { @@ -156,6 +169,13 @@ struct esp_websocket_client { int payload_offset; esp_transport_keep_alive_t keep_alive_cfg; struct ifreq *if_name; + +#ifdef CONFIG_ESP_WS_CLIENT_ENABLE_COMPRESSION + char *decompress_buffer; + z_stream compress_tx_stream; // Client-to-server compression stream + z_stream compress_rx_stream; // Server-to-client compression stream +#endif + }; static uint64_t _tick_get_ms(void) @@ -180,6 +200,14 @@ static esp_err_t esp_websocket_new_buf(esp_websocket_client_handle_t client, boo client->rx_buffer = calloc(1, client->buffer_size); ESP_WS_CLIENT_MEM_CHECK(TAG, client->rx_buffer, return ESP_ERR_NO_MEM); + +#ifdef CONFIG_ESP_WS_CLIENT_ENABLE_COMPRESSION + if (client->decompress_buffer) { + free(client->decompress_buffer); + } +#endif + client->decompress_buffer = calloc(1, client->buffer_size); + ESP_WS_CLIENT_MEM_CHECK(TAG, client->decompress_buffer, return ESP_ERR_NO_MEM); } #endif return ESP_OK; @@ -198,6 +226,13 @@ static void esp_websocket_free_buf(esp_websocket_client_handle_t client, bool is free(client->rx_buffer); client->rx_buffer = NULL; } + +#ifdef CONFIG_ESP_WS_CLIENT_ENABLE_COMPRESSION + if (client->decompress_buffer) { + free(client->decompress_buffer); + client->decompress_buffer = NULL; + } +#endif } #endif } @@ -243,8 +278,15 @@ static esp_err_t esp_websocket_client_dispatch_event(esp_websocket_client_handle static esp_err_t esp_websocket_client_abort_connection(esp_websocket_client_handle_t client, esp_websocket_error_type_t error_type) { - ESP_WS_CLIENT_STATE_CHECK(TAG, client, return ESP_FAIL); esp_transport_close(client->transport); +#ifdef CONFIG_ESP_WS_CLIENT_ENABLE_COMPRESSION + if (client->compress_tx_stream.state) { + deflateEnd(&client->compress_tx_stream); + } + if (client->compress_rx_stream.state) { + inflateEnd(&client->compress_rx_stream); + } +#endif if (!client->config->auto_reconnect) { client->run = false; @@ -460,6 +502,9 @@ static void destroy_and_free_resources(esp_websocket_client_handle_t client) #endif free(client->tx_buffer); free(client->rx_buffer); +#ifdef CONFIG_ESP_WS_CLIENT_ENABLE_COMPRESSION + free(client->decompress_buffer); +#endif free(client->errormsg_buffer); if (client->status_bits) { vEventGroupDelete(client->status_bits); @@ -494,7 +539,14 @@ static esp_err_t set_websocket_transport_optional_settings(esp_websocket_client_ .user_agent = client->config->user_agent, .headers = client->config->headers, .auth = client->config->auth, - .propagate_control_frames = true + .propagate_control_frames = true, +#ifdef CONFIG_ESP_WS_CLIENT_ENABLE_COMPRESSION + .per_msg_compress = client->config->per_msg_compress, + .per_msg_client_deflate_window_bit = client->config->per_msg_client_deflate_window_bit, + .per_msg_client_no_ctx_takeover = client->config->per_msg_client_no_ctx_takeover, + .per_msg_server_deflate_window_bit = client->config->per_msg_server_deflate_window_bit, + .per_msg_server_no_ctx_takeover = client->config->per_msg_server_no_ctx_takeover, +#endif }; return esp_transport_ws_set_config(trans, &config); } @@ -640,6 +692,74 @@ static int esp_websocket_client_send_with_exact_opcode(esp_websocket_client_hand } #endif +#ifdef CONFIG_ESP_WS_CLIENT_ENABLE_COMPRESSION + if ((opcode & WS_TRANSPORT_OPCODES_COMPRESSED) && client->config->per_msg_compress && len > 0) { + ws_transport_opcodes_t current_opcode = opcode; + current_opcode &= ~WS_TRANSPORT_OPCODES_FIN; // FIN will be set on the last frame + + if (esp_websocket_new_buf(client, true) != ESP_OK) { + ESP_LOGE(TAG, "Failed to setup tx buffer"); + ret = -1; + goto unlock_and_return; + } + + client->compress_tx_stream.avail_in = len; + client->compress_tx_stream.next_in = (Bytef *)data; + + do { + client->compress_tx_stream.avail_out = client->buffer_size; + client->compress_tx_stream.next_out = (Bytef *)client->tx_buffer; + + int flush = (client->compress_tx_stream.avail_in > 0) ? Z_NO_FLUSH : Z_SYNC_FLUSH; + int deflate_ret = deflate(&client->compress_tx_stream, flush); + if (deflate_ret != Z_OK) { + esp_websocket_client_error(client, "Failed to deflate data (%d)", deflate_ret); + ret = -1; + esp_websocket_free_buf(client, true); + goto unlock_and_return; + } + + int compressed_len = client->buffer_size - client->compress_tx_stream.avail_out; + + if (flush == Z_SYNC_FLUSH) { + if (compressed_len >= 4 && + (uint8_t)client->tx_buffer[compressed_len - 4] == 0x00 && + (uint8_t)client->tx_buffer[compressed_len - 3] == 0x00 && + (uint8_t)client->tx_buffer[compressed_len - 2] == 0xff && + (uint8_t)client->tx_buffer[compressed_len - 1] == 0xff) { + compressed_len -= 4; + } + } + + if (compressed_len > 0) { + ws_transport_opcodes_t send_opcode = current_opcode; + bool is_last_frame = (client->compress_tx_stream.avail_in == 0 && client->compress_tx_stream.avail_out != 0); + if (is_last_frame && contained_fin) { + send_opcode |= WS_TRANSPORT_OPCODES_FIN; + } + + wlen = esp_transport_ws_send_raw(client->transport, send_opcode, (char *)client->tx_buffer, compressed_len, + (timeout == portMAX_DELAY) ? -1 : timeout * portTICK_PERIOD_MS); + if (wlen < 0 || (wlen == 0 && compressed_len != 0)) { + ret = wlen; + esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_TCP_TRANSPORT); + esp_websocket_free_buf(client, true); + goto unlock_and_return; + } + current_opcode = WS_TRANSPORT_OPCODES_CONT | (opcode & WS_TRANSPORT_OPCODES_COMPRESSED); + } + } while (client->compress_tx_stream.avail_in > 0 || client->compress_tx_stream.avail_out == 0); + + if (client->config->per_msg_client_no_ctx_takeover) { + deflateReset(&client->compress_tx_stream); + } + + esp_websocket_free_buf(client, true); + ret = len; + goto unlock_and_return; + } +#endif + if (esp_websocket_new_buf(client, true) != ESP_OK) { ESP_LOGE(TAG, "Failed to setup tx buffer"); goto unlock_and_return; @@ -766,6 +886,15 @@ esp_websocket_client_handle_t esp_websocket_client_init(const esp_websocket_clie client->config->crt_bundle_attach = config->crt_bundle_attach; client->config->ext_transport = config->ext_transport; +#ifdef CONFIG_ESP_WS_CLIENT_ENABLE_COMPRESSION + client->config->per_msg_compress = config->per_msg_compress; + client->config->per_msg_client_deflate_window_bit = config->per_msg_client_deflate_window_bit; + client->config->per_msg_server_deflate_window_bit = config->per_msg_server_deflate_window_bit; + client->config->per_msg_client_no_ctx_takeover = config->per_msg_client_no_ctx_takeover; + client->config->per_msg_server_no_ctx_takeover = config->per_msg_server_no_ctx_takeover; + client->config->per_msg_compress_level = config->per_msg_compress_level; +#endif + if (config->uri) { if (esp_websocket_client_set_uri(client, config->uri) != ESP_OK) { ESP_LOGE(TAG, "Invalid uri"); @@ -806,6 +935,12 @@ esp_websocket_client_handle_t esp_websocket_client_init(const esp_websocket_clie ESP_WS_CLIENT_MEM_CHECK(TAG, client->tx_buffer, { goto _websocket_init_fail; }); +#ifdef CONFIG_ESP_WS_CLIENT_ENABLE_COMPRESSION + client->decompress_buffer = malloc(buffer_size); + ESP_WS_CLIENT_MEM_CHECK(TAG, client->decompress_buffer, { + goto _websocket_init_fail; + }); +#endif #endif client->status_bits = xEventGroupCreate(); ESP_WS_CLIENT_MEM_CHECK(TAG, client->status_bits, { @@ -1001,20 +1136,92 @@ static esp_err_t esp_websocket_client_recv(esp_websocket_client_handle_t client) client->payload_len = esp_transport_ws_get_read_payload_len(client->transport); client->last_fin = esp_transport_ws_get_fin_flag(client->transport); client->last_opcode = esp_transport_ws_get_read_opcode(client->transport); +#ifdef CONFIG_ESP_WS_CLIENT_ENABLE_COMPRESSION + if (client->config->per_msg_compress && esp_transport_ws_get_rsv1_flag(client->transport)) { + client->last_opcode |= WS_TRANSPORT_OPCODES_COMPRESSED; + } +#endif - if (rlen == 0 && client->last_opcode == WS_TRANSPORT_OPCODES_NONE) { + if (rlen == 0 && (client->last_opcode & ~WS_TRANSPORT_OPCODES_COMPRESSED) == WS_TRANSPORT_OPCODES_NONE) { ESP_LOGV(TAG, "esp_transport_read timeouts"); esp_websocket_free_buf(client, false); return ESP_OK; } +#ifdef CONFIG_ESP_WS_CLIENT_ENABLE_COMPRESSION + if (client->config->per_msg_compress && (client->last_opcode & WS_TRANSPORT_OPCODES_COMPRESSED)) { + client->compress_rx_stream.avail_in = rlen; + client->compress_rx_stream.next_in = (Bytef *)client->rx_buffer; + + while (client->compress_rx_stream.avail_in > 0) { + client->compress_rx_stream.avail_out = client->buffer_size; + client->compress_rx_stream.next_out = (Bytef *)client->decompress_buffer; + int ret = inflate(&client->compress_rx_stream, Z_NO_FLUSH); + + if (ret != Z_OK && ret != Z_STREAM_END) { + esp_websocket_client_error(client, "inflate failed, error %d", ret); + esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_COMPRESSION); + esp_websocket_free_buf(client, false); + return ESP_FAIL; + } + + int decompressed_len = client->buffer_size - client->compress_rx_stream.avail_out; + if (decompressed_len > 0) { + ws_transport_opcodes_t original_opcode = client->last_opcode; + client->last_opcode &= ~WS_TRANSPORT_OPCODES_COMPRESSED; + esp_websocket_client_dispatch_event(client, WEBSOCKET_EVENT_DATA, client->decompress_buffer, decompressed_len); + client->last_opcode = original_opcode; + } + } + } else { + esp_websocket_client_dispatch_event(client, WEBSOCKET_EVENT_DATA, client->rx_buffer, rlen); + } +#else esp_websocket_client_dispatch_event(client, WEBSOCKET_EVENT_DATA, client->rx_buffer, rlen); +#endif client->payload_offset += rlen; } while (client->payload_offset < client->payload_len); +#ifdef CONFIG_ESP_WS_CLIENT_ENABLE_COMPRESSION + if (client->config->per_msg_compress && (client->last_opcode & WS_TRANSPORT_OPCODES_COMPRESSED) && client->last_fin) { + const uint8_t tail[] = {0x00, 0x00, 0xff, 0xff}; + client->compress_rx_stream.avail_in = sizeof(tail); + client->compress_rx_stream.next_in = (Bytef *)tail; + + do { + client->compress_rx_stream.avail_out = client->buffer_size; + client->compress_rx_stream.next_out = (Bytef *)client->decompress_buffer; + int ret = inflate(&client->compress_rx_stream, Z_SYNC_FLUSH); + + if (ret != Z_OK && ret != Z_STREAM_END) { + esp_websocket_client_error(client, "inflate with tail failed, error %d", ret); + esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_COMPRESSION); + esp_websocket_free_buf(client, false); + return ESP_FAIL; + } + + int decompressed_len = client->buffer_size - client->compress_rx_stream.avail_out; + if (decompressed_len > 0) { + ws_transport_opcodes_t original_opcode = client->last_opcode; + client->last_opcode &= ~WS_TRANSPORT_OPCODES_COMPRESSED; + esp_websocket_client_dispatch_event(client, WEBSOCKET_EVENT_DATA, client->decompress_buffer, decompressed_len); + client->last_opcode = original_opcode; + } + if (ret == Z_STREAM_END) { + break; + } + } while (client->compress_rx_stream.avail_out == 0); + + if (client->config->per_msg_server_no_ctx_takeover) { + inflateReset(&client->compress_rx_stream); + } + } +#endif + + ws_transport_opcodes_t opcode = client->last_opcode & ~WS_TRANSPORT_OPCODES_COMPRESSED; // if a PING message received -> send out the PONG, this will not work for PING messages with payload longer than buffer len - if (client->last_opcode == WS_TRANSPORT_OPCODES_PING) { + if (opcode == WS_TRANSPORT_OPCODES_PING) { const char *data = (client->payload_len == 0) ? NULL : client->rx_buffer; ESP_LOGD(TAG, "Sending PONG with payload len=%d", client->payload_len); #ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK @@ -1028,9 +1235,9 @@ static esp_err_t esp_websocket_client_recv(esp_websocket_client_handle_t client) #ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK xSemaphoreGiveRecursive(client->tx_lock); #endif - } else if (client->last_opcode == WS_TRANSPORT_OPCODES_PONG) { + } else if (opcode == WS_TRANSPORT_OPCODES_PONG) { client->wait_for_pong_resp = false; - } else if (client->last_opcode == WS_TRANSPORT_OPCODES_CLOSE) { + } else if (opcode == WS_TRANSPORT_OPCODES_CLOSE) { ESP_LOGD(TAG, "Received close frame"); client->state = WEBSOCKET_STATE_CLOSING; } @@ -1119,6 +1326,48 @@ static void esp_websocket_client_task(void *pv) } } #endif + +#ifdef CONFIG_ESP_WS_CLIENT_ENABLE_COMPRESSION + // If the compression is enabled, and the HTTP upgrade handshake is done, then: + // 1. we need to pull back the compression parameters from the transport_ws as the server may expect something different from what we hinted; + // 2. we need to set up the zlib handles. + client->config->per_msg_compress = esp_transport_ws_get_per_msg_compress(client->transport); + if (client->config->per_msg_compress) { + client->config->per_msg_client_no_ctx_takeover = esp_transport_ws_get_per_msg_client_no_ctx_takeover(client->transport); + client->config->per_msg_server_no_ctx_takeover = esp_transport_ws_get_per_msg_server_no_ctx_takeover(client->transport); + client->config->per_msg_client_deflate_window_bit = esp_transport_ws_get_per_msg_client_deflate_window_bit(client->transport); + client->config->per_msg_server_deflate_window_bit = esp_transport_ws_get_per_msg_server_deflate_window_bit(client->transport); + + client->compress_rx_stream.zalloc = Z_NULL; + client->compress_rx_stream.zfree = Z_NULL; + client->compress_rx_stream.opaque = Z_NULL; + int zlib_ret = inflateInit2(&client->compress_rx_stream, + -1 * client->config->per_msg_server_deflate_window_bit); + + if (zlib_ret != Z_OK) { + esp_websocket_client_error(client, "Failed to init rx-bound compression stream (%d)", zlib_ret); + esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_COMPRESSION); + break; + } + + client->compress_tx_stream.zalloc = Z_NULL; + client->compress_tx_stream.zfree = Z_NULL; + client->compress_tx_stream.opaque = Z_NULL; + zlib_ret = deflateInit2(&client->compress_tx_stream, + client->config->per_msg_compress_level, + Z_DEFLATED, + -1 * client->config->per_msg_client_deflate_window_bit, + 1, // Memory level set to 1 for now to save RAM + Z_DEFAULT_STRATEGY); + + if (zlib_ret != Z_OK) { + esp_websocket_client_error(client, "Failed to init tx-bound compression stream (%d)", zlib_ret); + esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_COMPRESSION); + break; + } + } +#endif + ESP_LOGD(TAG, "Transport connected to %s://%s:%d", client->config->scheme, client->config->host, client->config->port); client->state = WEBSOCKET_STATE_CONNECTED; @@ -1249,6 +1498,14 @@ static void esp_websocket_client_task(void *pv) esp_websocket_client_dispatch_event(client, WEBSOCKET_EVENT_FINISH, NULL, 0); esp_transport_close(client->transport); +#ifdef CONFIG_ESP_WS_CLIENT_ENABLE_COMPRESSION + if (client->compress_tx_stream.state) { + deflateEnd(&client->compress_tx_stream); + } + if (client->compress_rx_stream.state) { + inflateEnd(&client->compress_rx_stream); + } +#endif xEventGroupSetBits(client->status_bits, STOPPED_BIT); client->state = WEBSOCKET_STATE_UNKNOW; if (client->selected_for_destroying == true) { diff --git a/components/esp_websocket_client/idf_component.yml b/components/esp_websocket_client/idf_component.yml index 804ab3bac0..035f5fd7e1 100644 --- a/components/esp_websocket_client/idf_component.yml +++ b/components/esp_websocket_client/idf_component.yml @@ -1,6 +1,8 @@ -version: "1.5.0" +version: 1.5.0 description: WebSocket protocol client for ESP-IDF -url: https://github.com/espressif/esp-protocols/tree/master/components/esp_websocket_client +url: + https://github.com/espressif/esp-protocols/tree/master/components/esp_websocket_client dependencies: idf: - version: ">=5.0" + version: '>=5.0' + espressif/zlib: ^1.3.1 diff --git a/components/esp_websocket_client/include/esp_websocket_client.h b/components/esp_websocket_client/include/esp_websocket_client.h index c125a6e350..de4006b562 100644 --- a/components/esp_websocket_client/include/esp_websocket_client.h +++ b/components/esp_websocket_client/include/esp_websocket_client.h @@ -49,7 +49,10 @@ typedef enum { WEBSOCKET_ERROR_TYPE_TCP_TRANSPORT, WEBSOCKET_ERROR_TYPE_PONG_TIMEOUT, WEBSOCKET_ERROR_TYPE_HANDSHAKE, - WEBSOCKET_ERROR_TYPE_SERVER_CLOSE + WEBSOCKET_ERROR_TYPE_SERVER_CLOSE, +#ifdef CONFIG_ESP_WS_CLIENT_ENABLE_COMPRESSION + WEBSOCKET_ERROR_TYPE_COMPRESSION, +#endif } esp_websocket_error_type_t; /** @@ -136,6 +139,19 @@ typedef struct { size_t ping_interval_sec; /*!< Websocket ping interval, defaults to 10 seconds if not set */ struct ifreq *if_name; /*!< The name of interface for data to go through. Use the default interface without setting */ esp_transport_handle_t ext_transport; /*!< External WebSocket tcp_transport handle to the client; or if null, the client will create its own transport handle. */ + +#ifdef CONFIG_ESP_WS_CLIENT_ENABLE_COMPRESSION + bool per_msg_compress; /*!< Enable per-message compression (RFC7692) */ + int per_msg_client_deflate_window_bit; /*!< Hint the server Per-message deflate window bit 8 to 15; or leave 0 to let server decide */ + int per_msg_server_deflate_window_bit; /*!< Hint the server Per-message deflate window bit 8 to 15; or leave 0 to let server decide */ + bool per_msg_server_no_ctx_takeover; /*!< Hint the server to reset the compression stream on every WS frame on server side + * True for a safer transfer, false for better performance */ + bool per_msg_client_no_ctx_takeover; /*!< Hint the server to reset the compression stream on every WS frame on client side + * True for a safer transfer, false for better performance */ + int per_msg_compress_level; /*!< Compression level for zlib DEFLATE, from 0 to 9 + * 0 means no compression (just copy), 1 means fastest compression, 9 means best compression */ +#endif + } esp_websocket_client_config_t; /**