diff --git a/.gitmodules b/.gitmodules index 393784b6..762fbcbd 100644 --- a/.gitmodules +++ b/.gitmodules @@ -2,3 +2,6 @@ [submodule "dev/OpenOCD"] path = dev/OpenOCD url = https://github.com/STMicroelectronics/OpenOCD.git +[submodule "dev/nanopb/nanopb"] + path = dev/nanopb/nanopb + url = https://github.com/nanopb/nanopb.git diff --git a/NetX/inc/u_nx_ethernet.h b/NetX/inc/u_nx_ethernet.h index 95afd7e9..1f123241 100644 --- a/NetX/inc/u_nx_ethernet.h +++ b/NetX/inc/u_nx_ethernet.h @@ -17,6 +17,11 @@ #define ETH_MESSAGE_SIZE 128 /* Maximum ethernet message size in bytes. */ #define ETH_MAX_PACKETS 10 /* Maximum number of packets we wanna handle simultaneously */ #define ETH_NUMBER_OF_NODES 8 /* Number of nodes in the network. */ +#define ETH_ENABLE_MANUAL_UDP_MULTICAST 0 // whether to enable UDP multicast +#define ETH_ENABLE_IGMP 1 // whether to enable IGMP +#define ETH_ENABLE_MQTT 1 // whether to use a MQTT connection +#define ETH_MQTT_SERVER_IP IP_ADDRESS(10,0,0,1) // the server address of the broker (TPU usually) +#define ETH_MQTT_SERVER_PORT 1883 typedef enum { TPU = (1 << 0), // 0b00000001 @@ -61,7 +66,12 @@ typedef struct __attribute__((__packed__)) { /* Function Pointers (for initialization). */ typedef void (*DriverFunction)(NX_IP_DRIVER *); /* User-supplied network driver used to send and receive IP packets. */ + +#if ETH_ENABLE_MANUAL_UDP_MULTICAST +typedef void (*OnRecieve)(ethernet_message_t message); /* User-supplied function that will be called whenever an ethernet message is recieved. */ +#else typedef void (*OnRecieve)(ethernet_message_t message); /* User-supplied function that will be called whenever an ethernet message is recieved. */ +#endif /** * @brief Initializes the NetX ethernet system in a repo. @@ -70,8 +80,9 @@ typedef void (*OnRecieve)(ethernet_message_t message); /* User-supplied function * @param on_recieve User-supplied function to be called whenever an ethernet message is recieved. The function's only parameter is an ethernet_message_t instance containing the recieved message. * @return Status. */ -uint8_t ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve on_recieve); +UINT ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve on_recieve); +#if ETH_ENABLE_MANUAL_UDP_MULTICAST /** * @brief Creates an ethernet message. Can be send with ethernet_send_message(), or added to a queue. * @param recipient_id The ID of the recipient node. @@ -88,12 +99,54 @@ ethernet_message_t ethernet_create_message(uint8_t message_id, ethernet_node_t r * @return Status. */ uint8_t ethernet_send_message(ethernet_message_t *message); +#endif + +#if ETH_ENABLE_MQTT +/** + * @brief Sends a MQTT message to outgoing queue + * @param topic_name The topic name + * @param topic_size The topic size in bytes + * @param message The data to send + * @param message_size The message size in bytes + * @return The error code. + */ +UINT ethernet_mqtt_publish(char* topic_name, UINT topic_size, char* message, UINT message_size); + +/** + * Connect to a disconnected MQTT server (NX_MQTT_NOT_CONNECTED) + * Will yield while trying to connect + */ +UINT ethernet_mqtt_reconnect(void); + +/** + * + */ +//ethernet_mqtt_subscribe(); + +/** + * + */ + +#endif /** * @brief Retrieves the time from PTP stack. - * @return The UTC time + * @param datetime Buffer for the retrieved datetime info. + * @return U_SUCCESS if successful, U_ERROR is not successful. + */ +int ethernet_get_time(NX_PTP_DATE_TIME* datetime); + +/** + * @brief Gets the number of microseconds since the Unix epoch (1970-01-01 00:00:00 UTC), using the PTP stack. + * @param buffer The buffer for the retrieved time. + * @return U_SUCCESS if successful, U_ERROR is not successful. + */ +int ethernet_ptp_get_unix_microseconds(uint64_t* buffer); + +/** + * Debugging, print the status of ARP statistics */ -NX_PTP_DATE_TIME ethernet_get_time(void); +UINT ethernent_print_arp_status(void); // clang-format on #endif /* u_nx_ethernet.h */ diff --git a/NetX/inc/u_nx_protobuf.h b/NetX/inc/u_nx_protobuf.h new file mode 100644 index 00000000..3c88e650 --- /dev/null +++ b/NetX/inc/u_nx_protobuf.h @@ -0,0 +1,83 @@ +#pragma once + +// clang-format off + +/* +* Wrapper for structuring and sending protobuf Ethernet messages over MQTT. +* This API sits above the lower-level u_nx_ethernet.h driver layer. Messages with nonstandard formats can still be sent using the u_nx_ethernet.h API directly. +*/ + +#include +#include +#include "serverdata.pb.h" + +/* Helper macros. */ +#define PB_STR_HELPER(x) #x // Helper for PB_TOSTR(). Probably should never use directly. +#define PB_TOSTR(x) PB_STR_HELPER(x) // Converts a macro's value into a string. +#define PB_COUNT_ARGS(...) (sizeof((float[]){ __VA_ARGS__ }) / sizeof(float)) // Returns the number of arguments passed into it. +#define PB_STR_LEN(s) (sizeof(s) - 1) // Returns the length of a string literal. + +/* CONFIG: Compile-time validation of topic size, unit size, and number of values. */ +#define PB_MAX_TOPIC_LENGTH 100 // Maximum length of topic string literal (in characters). +#define PB_MAX_UNIT_LENGTH 15 // Maximum length of unit string literal (in characters). +#define PB_MIN_DATAPOINTS 1 // Minimum number of datapoints (i.e., variable `...` arguments passed into `nx_protobuf_mqtt_message_create()`). +#define PB_MAX_DATAPOINTS 5 // Maximum number of datapoints (i.e., variable `...` arguments passed into `nx_protobuf_mqtt_message_create()`). +#define PB_VALIDATE_ARGS(topic, unit, num_values) \ + do { \ + _Static_assert( \ + PB_STR_LEN(topic) <= PB_MAX_TOPIC_LENGTH, \ + "MQTT topic parameter exceeds maximum length of " PB_TOSTR(PB_MAX_TOPIC_LENGTH) " allowed by `nx_protobuf_mqtt_message_create()`."\ + ); \ + _Static_assert( \ + PB_STR_LEN(unit) <= PB_MAX_UNIT_LENGTH, \ + "MQTT unit parameter exceeds maximum length of " PB_TOSTR(PB_MAX_UNIT_LENGTH) " allowed by `nx_protobuf_mqtt_message_create()`." \ + ); \ + _Static_assert( \ + (num_values) >= PB_MIN_DATAPOINTS, \ + "Must pass at least " PB_TOSTR(PB_MIN_DATAPOINTS) " value into the variable argument of `nx_protobuf_mqtt_message_create()`." \ + ); \ + _Static_assert( \ + (num_values) <= PB_MAX_DATAPOINTS, \ + "Cannot pass more than " PB_TOSTR(PB_MAX_DATAPOINTS) " values into the variable argument of `nx_protobuf_mqtt_message_create()`." \ + ); \ + } while (0) + +/** + * @brief Creates and formats a `ethernet_mqtt_message_t` object, and returns it to the caller. + * @param topic (const char*) String literal representing the message's MQTT topic name. + * @param unit (const char*) String literal representing the unit of the message's data. + * @param ... (float) The data to be sent in the message. This is a variable argument, so it can be repeated depending on how many datapoints you want to send. If you pass in more datapoints than allowed, you will get a compile-time error. + * @return An `ethernet_mqtt_message_t` object. + * @note If message creation was not completed for any reason, .initialized will be false in the returned `ethernet_mqtt_message_t` object. You may still use the object as you please (including attempting to initialize it again), but attempting to send the message (via `nx_protobuf_mqtt_message_send()`) will return an error. + */ +#define nx_protobuf_mqtt_message_create(topic, unit, ...) \ + ({ \ + PB_VALIDATE_ARGS(topic, unit, PB_COUNT_ARGS(__VA_ARGS__)); \ + _nx_protobuf_mqtt_message_create( \ + (topic), PB_STR_LEN(topic), \ + (unit), PB_STR_LEN(unit), \ + (float[]){ __VA_ARGS__ }, \ + PB_COUNT_ARGS(__VA_ARGS__) \ + ); \ + }) + + +/* Ethernet MQTT Message. */ +typedef struct { + const char* topic; + int topic_size; + serverdata_v2_ServerData protobuf; + bool initialized; +} ethernet_mqtt_message_t; + +/** + * @brief Dispatches a `ethernet_mqtt_message_t` message over MQTT. + * @param message The message to send. + * @return U_SUCCESS if successful, U_ERROR is not successful. + */ +int nx_protobuf_mqtt_message_send(ethernet_mqtt_message_t* message); + +/* MACRO IMPLEMENTATIONS */ +ethernet_mqtt_message_t _nx_protobuf_mqtt_message_create(const char* topic, size_t topic_size, const char* unit, size_t unit_len, const float values[], int values_count); + +// clang-format on \ No newline at end of file diff --git a/NetX/src/u_nx_debug.c b/NetX/src/u_nx_debug.c index 25808ed8..4add549d 100644 --- a/NetX/src/u_nx_debug.c +++ b/NetX/src/u_nx_debug.c @@ -1,4 +1,16 @@ #include "u_nx_debug.h" +#include "nxd_ptp_client.h" + +#if defined(__has_include) +#if __has_include("nxd_mqtt_client.h") +#include "nxd_mqtt_client.h" +#define U_NX_DEBUG_HAS_MQTT 1 +#endif +#endif + +#ifndef U_NX_DEBUG_HAS_MQTT +#define U_NX_DEBUG_HAS_MQTT 0 +#endif // clang-format off @@ -72,6 +84,45 @@ const char* nx_status_toString(UINT status) { case NX_OPTION_HEADER_ERROR: return "NX_OPTION_HEADER_ERROR"; case NX_CONTINUE: return "NX_CONTINUE"; case NX_TCPIP_OFFLOAD_ERROR: return "NX_TCPIP_OFFLOAD_ERROR"; + + /* MQTT-specific stuff. */ +#if U_NX_DEBUG_HAS_MQTT +#if (NXD_MQTT_SUCCESS != NX_SUCCESS) + case NXD_MQTT_SUCCESS: return "NXD_MQTT_SUCCESS"; +#endif + case NXD_MQTT_ALREADY_CONNECTED: return "NXD_MQTT_ALREADY_CONNECTED"; + case NXD_MQTT_NOT_CONNECTED: return "NXD_MQTT_NOT_CONNECTED"; + case NXD_MQTT_MUTEX_FAILURE: return "NXD_MQTT_MUTEX_FAILURE"; + case NXD_MQTT_INTERNAL_ERROR: return "NXD_MQTT_INTERNAL_ERROR"; + case NXD_MQTT_CONNECT_FAILURE: return "NXD_MQTT_CONNECT_FAILURE"; + case NXD_MQTT_PACKET_POOL_FAILURE: return "NXD_MQTT_PACKET_POOL_FAILURE"; + case NXD_MQTT_COMMUNICATION_FAILURE: return "NXD_MQTT_COMMUNICATION_FAILURE"; + case NXD_MQTT_SERVER_MESSAGE_FAILURE: return "NXD_MQTT_SERVER_MESSAGE_FAILURE"; + case NXD_MQTT_INVALID_PARAMETER: return "NXD_MQTT_INVALID_PARAMETER"; + case NXD_MQTT_NO_MESSAGE: return "NXD_MQTT_NO_MESSAGE"; + case NXD_MQTT_PACKET_POOL_EMPTY: return "NXD_MQTT_PACKET_POOL_EMPTY"; + case NXD_MQTT_QOS2_NOT_SUPPORTED: return "NXD_MQTT_QOS2_NOT_SUPPORTED"; + case NXD_MQTT_INSUFFICIENT_BUFFER_SPACE: return "NXD_MQTT_INSUFFICIENT_BUFFER_SPACE"; + case NXD_MQTT_CLIENT_NOT_RUNNING: return "NXD_MQTT_CLIENT_NOT_RUNNING"; + case NXD_MQTT_INVALID_PACKET: return "NXD_MQTT_INVALID_PACKET"; + case NXD_MQTT_PARTIAL_PACKET: return "NXD_MQTT_PARTIAL_PACKET"; + case NXD_MQTT_CONNECTING: return "NXD_MQTT_CONNECTING"; + case NXD_MQTT_INVALID_STATE: return "NXD_MQTT_INVALID_STATE"; + case NXD_MQTT_ERROR_CONNECT_RETURN_CODE: return "NXD_MQTT_ERROR_CONNECT_RETURN_CODE"; + case NXD_MQTT_ERROR_UNACCEPTABLE_PROTOCOL: return "NXD_MQTT_ERROR_UNACCEPTABLE_PROTOCOL"; + case NXD_MQTT_ERROR_IDENTIFYIER_REJECTED: return "NXD_MQTT_ERROR_IDENTIFYIER_REJECTED"; + case NXD_MQTT_ERROR_SERVER_UNAVAILABLE: return "NXD_MQTT_ERROR_SERVER_UNAVAILABLE"; + case NXD_MQTT_ERROR_BAD_USERNAME_PASSWORD: return "NXD_MQTT_ERROR_BAD_USERNAME_PASSWORD"; + case NXD_MQTT_ERROR_NOT_AUTHORIZED: return "NXD_MQTT_ERROR_NOT_AUTHORIZED"; +#endif + + /* PTP-specific stuff. */ + case NX_PTP_CLIENT_NOT_STARTED: return "NX_PTP_CLIENT_NOT_STARTED"; + case NX_PTP_CLIENT_ALREADY_STARTED: return "NX_PTP_CLIENT_ALREADY_STARTED"; + case NX_PTP_PARAM_ERROR: return "NX_PTP_PARAM_ERROR"; + case NX_PTP_CLIENT_INSUFFICIENT_PACKET_PAYLOAD: return "NX_PTP_CLIENT_INSUFFICIENT_PACKET_PAYLOAD"; + case NX_PTP_CLIENT_CLOCK_CALLBACK_FAILURE: return "NX_PTP_CLIENT_CLOCK_CALLBACK_FAILURE"; + default: return "UNKNOWN_STATUS"; } } diff --git a/NetX/src/u_nx_ethernet.c b/NetX/src/u_nx_ethernet.c index d1a6ee38..dbfbedcc 100644 --- a/NetX/src/u_nx_ethernet.c +++ b/NetX/src/u_nx_ethernet.c @@ -2,49 +2,85 @@ #include "u_nx_ethernet.h" #include "nx_stm32_eth_driver.h" #include "nxd_ptp_client.h" +#include "tx_api.h" #include "u_nx_debug.h" #include "u_tx_debug.h" #include "c_utils.h" #include "nx_api.h" +#include "u_tx_general.h" +#include +#include "serial.h" #include #include +#include +#if ETH_ENABLE_MQTT +#include "nxd_mqtt_client.h" +#endif /* PRIVATE MACROS */ -#define _IP_THREAD_STACK_SIZE 2048 -#define _ARP_CACHE_SIZE 1024 +#define _IP_THREAD_STACK_SIZE 4096 +#define _ARP_CACHE_SIZE 2048 #define _IP_THREAD_PRIORITY 1 #define _IP_NETWORK_MASK IP_ADDRESS(255, 255, 255, 0) #define _UDP_QUEUE_MAXIMUM 12 #define _PTP_THREAD_PRIORITY 2 +#define _MQTT_THREAD_PRIORITY 2 /* The DEFAULT_PAYLOAD_SIZE should match with RxBuffLen configured via MX_ETH_Init */ #define DEFAULT_PAYLOAD_SIZE 1524 -#define NX_APP_PACKET_POOL_SIZE ((DEFAULT_PAYLOAD_SIZE + sizeof(NX_PACKET)) * 10) +#define NX_APP_PACKET_POOL_SIZE ((DEFAULT_PAYLOAD_SIZE + sizeof(NX_PACKET)) * 100) +#define MQTT_CLIENT_STACK_SIZE 8192 + +extern ETH_HandleTypeDef heth; /* DEVICE INFO */ typedef struct { - /* NetX Objects */ - NX_UDP_SOCKET socket; - NX_PACKET_POOL packet_pool; + uint8_t node_id; + NX_IP ip; - NX_PTP_CLIENT ptp_client; - SHORT ptp_utc_offset; + UCHAR ip_memory[_IP_THREAD_STACK_SIZE]; + DriverFunction + driver; /* Set by the user. Used to communicate with the driver layer. */ + OnRecieve on_recieve; /* Set by the user. Called when a message is recieved. */ - /* Static memory for NetX stuff */ + NX_PACKET_POOL packet_pool; UCHAR packet_pool_memory[NX_APP_PACKET_POOL_SIZE]; - UCHAR ip_memory[_IP_THREAD_STACK_SIZE]; + UCHAR arp_cache_memory[_ARP_CACHE_SIZE]; + + #if ETH_ENABLE_MANUAL_UDP_MULTICAST + NX_UDP_SOCKET socket; + #endif + + #if ETH_ENABLE_MQTT + NXD_MQTT_CLIENT mqtt_client; + UCHAR mqtt_thread_stack[MQTT_CLIENT_STACK_SIZE / sizeof(ULONG)]; + #endif + + + NX_PTP_CLIENT ptp_client; + SHORT ptp_utc_offset; ULONG ptp_stack[2048 / sizeof(ULONG)]; - /* Device config variables */ bool is_initialized; - uint8_t node_id; - DriverFunction - driver; /* Set by the user. Used to communicate with the driver layer. */ - OnRecieve on_recieve; /* Set by the user. Called when a message is recieved. */ } _ethernet_device_t; static _ethernet_device_t device = { 0 }; +/* CALLBACK FUNCTIONS */ +#if ETH_ENABLE_MQTT +static VOID _mqtt_disconnect_callback(NXD_MQTT_CLIENT *client_ptr) +{ + PRINTLN_WARNING("client disconnected from server\n"); +} + +static VOID _mqtt_recieve_callback(NXD_MQTT_CLIENT* client_ptr, UINT number_of_messages) +{ + //tx_event_flags_set(&mqtt_app_flag, DEMO_MESSAGE_EVENT, TX_OR); + return; +} +#endif + + /* Callback function. Called when a PTP event is processed. */ // extern UINT ptp_clock_callback(NX_PTP_CLIENT *client_ptr, UINT operation, // NX_PTP_TIME *time_ptr, NX_PACKET *packet_ptr, @@ -85,6 +121,7 @@ static UINT _ptp_event_callback(NX_PTP_CLIENT *ptp_client_ptr, UINT event, VOID } /* Callback function. Called when an ethernet message is received. */ +#if ETH_ENABLE_MANUAL_UDP_MULTICAST static void _receive_message(NX_UDP_SOCKET *socket) { NX_PACKET *packet; ULONG bytes_copied; @@ -130,12 +167,13 @@ static void _receive_message(NX_UDP_SOCKET *socket) { /* Release the packet */ nx_packet_release(packet); } +#endif /* API FUNCTIONS */ -uint8_t ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve on_recieve) { +UINT ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve on_recieve) { - uint8_t status; + UINT status; device.ptp_utc_offset = 0; // no offset to start /* Make sure device isn't already initialized */ @@ -166,8 +204,8 @@ uint8_t ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve status = nx_ip_create( &device.ip, // Pointer to the IP instance "Ethernet IP Instance", // Name - IP_ADDRESS(5, 5, 5, device.node_id), // Dummy unicast IP (we shouldn't have to use this if we're just using multicast for everything) - _IP_NETWORK_MASK, // Network mask + IP_ADDRESS(10, 0, 0, device.node_id), // Unicast IP derived from node_id, 10.0.0.0/24 + _IP_NETWORK_MASK, // Network mask /24 &device.packet_pool, // Pointer to the packet pool device.driver, // Pointer to the Ethernet driver function device.ip_memory, // Pointer to the memory for the IP instance @@ -179,6 +217,15 @@ uint8_t ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve return status; } + // behold the magic + // this is a poll function to wait until the driver has completely initialized + // without it the arp module and the TCP module fail silently and confusingly + ULONG current_status; + do { + nx_ip_driver_direct_command(&device.ip, 51, ¤t_status); + tx_thread_sleep(100); + } while (current_status != 4);// NX_DRIVER_STATE_LINK_ENABLED + /* Enable ARP */ status = nx_arp_enable( &device.ip, // IP instance @@ -191,6 +238,21 @@ uint8_t ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve } + /* Enable igmp */ +#if ETH_ENABLE_IGMP || ETH_ENABLE_MANUAL_UDP_MULTICAST + status = nx_igmp_enable(&device.ip); + if (status != NX_SUCCESS) { + PRINTLN_ERROR("Failed to enable igmp (Status: %d/%s).", status, nx_status_toString(status)); + return status; + } +#endif + + status = nx_icmp_enable(&device.ip); + if (status != NX_SUCCESS) { + PRINTLN_ERROR("Failed to enable icmp (Status: %d/%s).", status, nx_status_toString(status)); + return status; + } + /* Enable UDP */ status = nx_udp_enable(&device.ip); if (status != NX_SUCCESS) { @@ -198,13 +260,13 @@ uint8_t ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve return status; } - /* Enable igmp */ - status = nx_igmp_enable(&device.ip); + status = nx_tcp_enable(&device.ip); if (status != NX_SUCCESS) { - PRINTLN_ERROR("Failed to enable igmp (Status: %d/%s).", status, nx_status_toString(status)); + PRINTLN_ERROR("Failed to enable TCP (Status: %d/%s).", status, nx_status_toString(status)); return status; } +#if ETH_ENABLE_MANUAL_UDP_MULTICAST /* Set up multicast groups. * (This iterates through every possible node combination between 0b00000001 and 0b11111111. * If any of the combinations include device.node_id, that combination gets added as a multicast group. @@ -224,22 +286,6 @@ uint8_t ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve } } - /* Create the PTP client instance */ - status = nx_ptp_client_create(&device.ptp_client, &device.ip, 0, &device.packet_pool, - _PTP_THREAD_PRIORITY, (UCHAR *)&device.ptp_stack, sizeof(device.ptp_stack), - _nx_ptp_client_soft_clock_callback, NX_NULL); - if(status != NX_SUCCESS) { - PRINTLN_ERROR("Failed to create PTP client (Status: %d/%s).", status, nx_status_toString(status)); - return status; - } - - /* start the PTP client */ - status = nx_ptp_client_start(&device.ptp_client, NX_NULL, 0, 0, 0, _ptp_event_callback, NX_NULL); - if(status != NX_SUCCESS) { - PRINTLN_ERROR("Failed to start PTP client (Status: %d/%s).", status, nx_status_toString(status)); - return status; - } - /* Create UDP socket for broadcasting */ status = nx_udp_socket_create( &device.ip, // IP instance @@ -278,15 +324,72 @@ uint8_t ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve nx_udp_socket_delete(&device.socket); return status; } +#endif + + /* Create the PTP client instance */ + status = nx_ptp_client_create(&device.ptp_client, &device.ip, 0, &device.packet_pool, + _PTP_THREAD_PRIORITY, (UCHAR *)&device.ptp_stack, sizeof(device.ptp_stack), + _nx_ptp_client_soft_clock_callback, NX_NULL); + if(status != NX_SUCCESS) { + PRINTLN_ERROR("Failed to create PTP client (Status: %d/%s).", status, nx_status_toString(status)); + return status; + } + /* start the PTP client */ + status = nx_ptp_client_start(&device.ptp_client, NX_NULL, 0, 0, 0, _ptp_event_callback, NX_NULL); + if(status != NX_SUCCESS) { + PRINTLN_ERROR("Failed to start PTP client (Status: %d/%s).", status, nx_status_toString(status)); + return status; + } + +#if ETH_ENABLE_MQTT +/* Create MQTT client instance. */ + char client_id[8] = ""; + UINT client_id_size = sprintf(client_id, "FWD-%d", device.node_id); + + status = nxd_mqtt_client_create(&device.mqtt_client, "MQTT client", + client_id, client_id_size, &device.ip, &device.packet_pool, + (VOID*)device.mqtt_thread_stack, sizeof(device.mqtt_thread_stack), + _MQTT_THREAD_PRIORITY, NX_NULL, 0); + if(status != NX_SUCCESS) { + PRINTLN_ERROR("Failed to create MQTT client (Status: %d/%s).", status, nx_status_toString(status)); + return status; + } + + /* Register the disconnect notification function. */ + status = nxd_mqtt_client_disconnect_notify_set(&device.mqtt_client, _mqtt_disconnect_callback); + if(status != NX_SUCCESS) { + PRINTLN_ERROR("Failed to create MQTT disconnect notification (Status: %d/%s).", status, nx_status_toString(status)); + return status; + } + + /* Set the receive notify function. */ + status = nxd_mqtt_client_receive_notify_set(&device.mqtt_client, _mqtt_recieve_callback); + if (status != NX_SUCCESS) { + PRINTLN_ERROR("Failed to set mqtt recv notification (Status: %d/%s).", status, nx_status_toString(status)); + return status; + } + + NXD_ADDRESS server_ip; + server_ip.nxd_ip_version = 4; + server_ip.nxd_ip_address.v4 = ETH_MQTT_SERVER_IP; + /* Start the connection to the server. */ + status = nxd_mqtt_client_connect(&device.mqtt_client, &server_ip, ETH_MQTT_SERVER_PORT, + 1000, NX_TRUE, NX_WAIT_FOREVER); + if(status != NX_SUCCESS) { + PRINTLN_ERROR("Failed to connect to MQTT client (Status: %d/%s).", status, nx_status_toString(status)); + } else { + } +#endif /* Mark device as initialized. */ device.is_initialized = true; - PRINTLN_INFO("Ran ethernet_init()."); + PRINTLN_INFO("Ran ethernet_init()"); return NX_SUCCESS; } /* Creates an ethernet message (i.e. returns an ethernet_message_t instance). */ +#if ETH_ENABLE_MANUAL_UDP_MULTICAST ethernet_message_t ethernet_create_message(uint8_t message_id, ethernet_node_t recipient_id, uint8_t *data, uint8_t data_length) { ethernet_message_t message = {0}; @@ -391,17 +494,143 @@ uint8_t ethernet_send_message(ethernet_message_t *message) { PRINTLN_INFO("Sent ethernet message (Recipient ID: %d, Message ID: %d, Message Contents: %d).", message->recipient_id, message->message_id, message->data); return U_SUCCESS; } +#endif + +#if ETH_ENABLE_MQTT +UINT ethernet_mqtt_publish(char *topic_name, UINT topic_size, char *message, UINT message_size) { + return nxd_mqtt_client_publish(&device.mqtt_client, topic_name, topic_size-1, message, message_size, NX_FALSE, 0, MS_TO_TICKS(100)); +} + +UINT ethernet_mqtt_reconnect(void) { + NXD_ADDRESS server_ip; + server_ip.nxd_ip_version = 4; + server_ip.nxd_ip_address.v4 = ETH_MQTT_SERVER_IP; + // TODO fix bug that breaks reconnection with 0x10007 + nxd_mqtt_client_delete(&device.mqtt_client); + + char client_id[8] = ""; + UINT client_id_size = sprintf(client_id, "FWD-%d", device.node_id); + + UINT status = nxd_mqtt_client_create(&device.mqtt_client, "MQTT client", + client_id, client_id_size, &device.ip, &device.packet_pool, + (VOID*)device.mqtt_thread_stack, sizeof(device.mqtt_thread_stack), + _MQTT_THREAD_PRIORITY, NX_NULL, 0); + if(status != NX_SUCCESS) { + PRINTLN_ERROR("Failed to create MQTT client (Status: %d/%s).", status, nx_status_toString(status)); + return status; + } + + /* Start the connection to the server. */ + return nxd_mqtt_client_connect(&device.mqtt_client, &server_ip, ETH_MQTT_SERVER_PORT, + 1000, NX_TRUE, NX_WAIT_FOREVER); +} +#endif + +int ethernet_get_time(NX_PTP_DATE_TIME* datetime) { + NX_PTP_TIME tm = { 0 }; + NX_PTP_DATE_TIME dt = { 0 }; + NX_PTP_CLIENT_SYNC sync = { 0 }; + USHORT flags; + + /* If not initialized, don't try to read PTP yet. */ + if(!device.is_initialized) { + PRINTLN_ERROR("Tried getting PTP time before device has been initialized."); + return U_ERROR; + } -NX_PTP_DATE_TIME ethernet_get_time(void) { - NX_PTP_TIME tm; - NX_PTP_DATE_TIME date; /* read the PTP clock */ - nx_ptp_client_time_get(&device.ptp_client, &tm); + int status = nx_ptp_client_time_get(&device.ptp_client, &tm); + if(status != NX_SUCCESS) { + PRINTLN_ERROR("Failed to call nx_ptp_client_time_get() (Status: %d/%s).", status, nx_status_toString(status)); + return U_ERROR; + } + + PRINTLN_INFO("ptp nanoseconds: %ld", tm.nanosecond); + + /* Set utc_offset. */ + const SHORT utc_offset = 0; + PRINTLN_INFO("utc offset: %d", utc_offset); /* convert PTP time to UTC date and time */ - nx_ptp_client_utility_convert_time_to_date(&tm, 0, &date); + status = nx_ptp_client_utility_convert_time_to_date(&tm, utc_offset, &dt); + if(status != NX_SUCCESS) { + PRINTLN_ERROR("Failed to call nx_ptp_client_utility_convert_time_to_date() (Status: %d/%s).", status, nx_status_toString(status)); + return U_ERROR; + } - return date; + *datetime = dt; + return U_SUCCESS; } +/* Gets the number of microseconds since the Unix epoch (1970-01-01 00:00:00 UTC)*/ +int ethernet_ptp_get_unix_microseconds(uint64_t* buffer) +{ + + NX_PTP_DATE_TIME datetime = { 0 }; + + /* Get PTP datetime. */ + int status = ethernet_get_time(&datetime); + if(status != U_SUCCESS) { + PRINTLN_ERROR("Failed to call ethernet_get_time() (Status: %d).", status); + return U_ERROR; + } + + serial_monitor("datetime", "nanoseconds", "%d", datetime.nanosecond); + serial_monitor("datetime", "year", "%d", datetime.year); + serial_monitor("datetime", "month", "%d", datetime.month); + serial_monitor("datetime", "day", "%d", datetime.day); + + int y = datetime.year; + int m = datetime.month; + int d = datetime.day; + + /* Adjust year and month for March-based counting (simplifies leap year handling) */ + if (m <= 2) + { + y--; + m += 12; + } + + /* Days from epoch (1970-01-01) using the Rata Die algorithm */ + uint32_t days = 365 * y + y / 4 - y / 100 + y / 400 + + (153 * (m - 3) + 2) / 5 + d - 719469; + + uint64_t us = (uint64_t)days * 86400LL * 1000000LL + + (uint64_t)datetime.hour * 3600LL * 1000000LL + + (uint64_t)datetime.minute * 60LL * 1000000LL + + (uint64_t)datetime.second * 1000000LL + + (uint64_t)(datetime.nanosecond / 1000); + + *buffer = us; + serial_monitor("datetime", "microseconds from epoch", "%" PRIu64, us); + return U_SUCCESS; +} + +UINT ethernet_print_arp_status(void) { + ULONG arp_requests_sent = 100; + ULONG arp_requests_received = 100; + + ULONG arp_responses_sent = 100; + ULONG arp_responses_received; + ULONG arp_dynamic_entries= 100; + ULONG arp_static_entries= 100; + ULONG arp_aged_entries= 100; + ULONG arp_invalid_messages= 100; + UINT status = nx_arp_info_get(&device.ip, &arp_requests_sent, &arp_requests_received, + &arp_responses_sent, &arp_responses_received, + &arp_dynamic_entries, &arp_static_entries, + &arp_aged_entries, &arp_invalid_messages); + if(status != NX_SUCCESS) { + PRINTLN_ERROR("Failed to retrieve ARP info (Status: %d/%s)", status, nx_status_toString(status)); + return status; + } + PRINTLN_INFO("ARP info REQ SENT %lu, REQ RECV %lu, RESP SENT %lu, RESP RECV %lu, DYN ENTRY %lu, S ENTRY %lu, AGED %lu, INVALID %lu", arp_requests_sent, arp_requests_received, + arp_responses_sent, arp_responses_received, + arp_dynamic_entries, arp_static_entries, + arp_aged_entries, arp_invalid_messages); + + return status; +} + + // clang-format on diff --git a/NetX/src/u_nx_protobuf.c b/NetX/src/u_nx_protobuf.c new file mode 100644 index 00000000..c0bfeacc --- /dev/null +++ b/NetX/src/u_nx_protobuf.c @@ -0,0 +1,123 @@ +#include +#include "u_tx_debug.h" +#include "u_nx_debug.h" +#include "u_nx_protobuf.h" +#include "u_nx_ethernet.h" +#include "pb.h" +#include "pb_encode.h" +#include "tx_api.h" +#include "nxd_mqtt_client.h" + +ethernet_mqtt_message_t _nx_protobuf_mqtt_message_create(const char* topic, size_t topic_len, const char* unit, size_t unit_len, const float values[], int values_size) { + /* Zero-initialize the protobuf struct and the sendable ethernet_mqtt_message_t message. */ + serverdata_v2_ServerData protobuf = serverdata_v2_ServerData_init_zero; + ethernet_mqtt_message_t message = { 0 }; + message.initialized = false; + + /* Enforce topic length. */ + if(topic_len > PB_MAX_TOPIC_LENGTH) { + PRINTLN_ERROR("MQTT Message topic exceeds maximum length of %d (Topic: %s, Current Topic Length: %d).", PB_MAX_TOPIC_LENGTH, topic, topic_len); + return message; // Return empty, uninitialized message. + } + // NOTE: If using the `nx_protobuf_mqtt_message_create()` macro (as intended), the static asserts should catch this. This is just an extra check in case a caller uses this function directly for whatever reason. + + /* Enforce unit length. */ + if(unit_len > PB_MAX_UNIT_LENGTH) { + PRINTLN_ERROR("MQTT Unit string length exceeds maximum length of %d (Topic: %s, Current Unit String Length: %d).", PB_MAX_UNIT_LENGTH, topic, unit_len); + return message; // Return empty, uninitialized message. + } + // NOTE: If using the `nx_protobuf_mqtt_message_create()` macro (as intended), the static asserts should catch this. This is just an extra check in case a caller uses this function directly for whatever reason. + + /* Enforce minimum number of datapoints. */ + if(values_size < PB_MIN_DATAPOINTS) { + PRINTLN_ERROR("Message must have at least %d datapoints (Topic: %s, Current values_size: %d).", PB_MIN_DATAPOINTS, topic, values_size); + return message; // Return empty, uninitialized message. + } + // NOTE: If using the `nx_protobuf_mqtt_message_create()` macro (as intended), the static asserts should catch this. This is just an extra check in case a caller uses this function directly for whatever reason. + + /* Enforce maximum number of datapoints. */ + if(values_size > PB_MAX_DATAPOINTS) { + PRINTLN_ERROR("Message cannot have more than %d datapoints (Topic: %s, Current values_size: %d).", PB_MAX_DATAPOINTS, topic, values_size); + return message; // Return empty, uninitialized message. + } + // NOTE: If using the `nx_protobuf_mqtt_message_create()` macro (as intended), the static asserts should catch this. This is just an extra check in case a caller uses this function directly for whatever reason. + + /* Get the PTP time and convert to appropriate protobuf time. */ + uint64_t datetime = 0; + int status = ethernet_ptp_get_unix_microseconds(&datetime); + if(status != U_SUCCESS) { + PRINTLN_ERROR("Failed to get PTP Unix time (Status: %d).", status); + return message; // Return empty, uninitialized message. + } + + + PRINTLN_INFO("got time: %d", datetime); + + /* Pack the protobuf message. */ + // CURRENT PROTOBUF SCHEMA LOOKS LIKE THIS: + // typedef struct _serverdata_v2_ServerData { + // char unit[15]; + // uint64_t time_us; + // pb_size_t values_count; + // float values[5]; + // } serverdata_v2_ServerData; + memcpy(protobuf.unit, unit, unit_len); + protobuf.time_us = datetime; + protobuf.values_count = values_size; + memcpy(protobuf.values, values, values_size * sizeof(float)); + + /* Pack the `ethernet_mqtt_message_t` object and return it as successfully initialized. */ + message.topic = topic; + message.topic_size = topic_len + 1; // u_TODO - for some reason you need to do + 1 here or else it will cut the last letter off of the topic in MQTT ui. The macro probably just calculates the topic length with one less than it should or something + message.protobuf = protobuf; + message.initialized = true; + return message; +} + +int nx_protobuf_mqtt_message_send(ethernet_mqtt_message_t* message) { + /* Make sure message isn't nullptr. */ + if(!message) { + PRINTLN_ERROR("Null pointer to `ethernet_mqtt_message_t` message."); + return U_ERROR; + } + + /* Make sure message is initialized. */ + if(!message->initialized) { + PRINTLN_ERROR("Attempting to send an uninitialized `ethernet_mqtt_message_t` message."); + return U_ERROR; + } + + /* Set up the buffer. */ + unsigned char buffer[serverdata_v2_ServerData_size]; + pb_ostream_t stream = pb_ostream_from_buffer(buffer, sizeof(buffer)); + + /* Encode the protobuf. */ + int status = pb_encode(&stream, serverdata_v2_ServerData_fields, &message->protobuf); + if(status != true) { + PRINTLN_ERROR("Failed to serialize protobuf message (Topic: %s): %s", message->topic, PB_GET_ERROR(&stream)); + return U_ERROR; + } + + /* Publish over MQTT. */ + status = ethernet_mqtt_publish(message->topic, message->topic_size, (char*)buffer, stream.bytes_written); // u_TODO - ethernet_mqtt_publish should return U_SUCCESS/U_ERROR instead of the internal netx error macro + if(status != NXD_MQTT_SUCCESS) { + PRINTLN_WARNING("Failed to publish MQTT message (Topic: %s, Status: %d).", message->topic, status); + + /* If disconnected, attempt reconnection. */ + if(status == NXD_MQTT_NOT_CONNECTED) { + PRINTLN_WARNING("Detected disconnect from MQTT. Attempting reconnection..."); + do { + tx_thread_sleep(1000); + status = ethernet_mqtt_reconnect(); + PRINTLN_WARNING("Attempting MQTT reconnection (Status: %d/%s).", status, nx_status_toString(status)); + } while ((status != NXD_MQTT_SUCCESS) && (status != NXD_MQTT_ALREADY_CONNECTED)); + PRINTLN_WARNING("MQTT reconnection successful."); + } + + return U_ERROR; + } + + /* Return successful! */ + PRINTLN_INFO("Sent MQTT message (Topic: %s).", message->topic); + return U_SUCCESS; +} \ No newline at end of file diff --git a/dev/nanopb/CMakeLists.txt b/dev/nanopb/CMakeLists.txt new file mode 100644 index 00000000..7a531d71 --- /dev/null +++ b/dev/nanopb/CMakeLists.txt @@ -0,0 +1,8 @@ +cmake_minimum_required(VERSION 3.22) + +set(CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/nanopb/extra) +find_package(Nanopb REQUIRED) + +nanopb_generate_cpp(TARGET proto serverdata.proto) + +target_link_libraries(${CMAKE_PROJECT_NAME} proto) diff --git a/dev/nanopb/nanopb b/dev/nanopb/nanopb new file mode 160000 index 00000000..c716db13 --- /dev/null +++ b/dev/nanopb/nanopb @@ -0,0 +1 @@ +Subproject commit c716db13070bfb7de03b33f5a6558528cbf8a249 diff --git a/dev/nanopb/serverdata.options b/dev/nanopb/serverdata.options new file mode 100644 index 00000000..77860c7c --- /dev/null +++ b/dev/nanopb/serverdata.options @@ -0,0 +1,2 @@ +serverdata.v2.ServerData.unit max_size: 15 +serverdata.v2.ServerData.values max_count: 5 diff --git a/dev/nanopb/serverdata.proto b/dev/nanopb/serverdata.proto new file mode 100644 index 00000000..3d74e6b0 --- /dev/null +++ b/dev/nanopb/serverdata.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package serverdata.v2; + +message ServerData { + // ensure old type is reserved + reserved 1; + reserved "value"; + + string unit = 2; + // time since unix epoch in MICROSECONDS + uint64 time_us = 3; + repeated float values = 4; +} diff --git a/patches/0322026_nx_stm32_query_status.patch b/patches/0322026_nx_stm32_query_status.patch new file mode 100644 index 00000000..d070f5d3 --- /dev/null +++ b/patches/0322026_nx_stm32_query_status.patch @@ -0,0 +1,30 @@ +diff --git a/Middlewares/ST/netxduo/common/drivers/ethernet/nx_stm32_eth_driver.c b/Middlewares/ST/netxduo/common/drivers/ethernet/nx_stm32_eth_driver.c +index d93161a..4e35bca 100644 +--- a/Middlewares/ST/netxduo/common/drivers/ethernet/nx_stm32_eth_driver.c ++++ b/Middlewares/ST/netxduo/common/drivers/ethernet/nx_stm32_eth_driver.c +@@ -265,6 +265,12 @@ NX_INTERFACE *interface_ptr; + } + #endif /* NX_ENABLE_INTERFACE_CAPABILITY */ + ++ case NX_DRIVER_GET_STATE: ++ { ++ *(driver_req_ptr->nx_ip_driver_return_ptr) = nx_driver_information.nx_driver_information_state; ++ break; ++ } ++ + default: + + +diff --git a/Middlewares/ST/netxduo/common/drivers/ethernet/nx_stm32_eth_driver.h b/Middlewares/ST/netxduo/common/drivers/ethernet/nx_stm32_eth_driver.h +index 0198ed9..a94fa14 100644 +--- a/Middlewares/ST/netxduo/common/drivers/ethernet/nx_stm32_eth_driver.h ++++ b/Middlewares/ST/netxduo/common/drivers/ethernet/nx_stm32_eth_driver.h +@@ -72,6 +72,8 @@ extern "C" { + #define NX_DRIVER_STATE_INITIALIZED 3 + #define NX_DRIVER_STATE_LINK_ENABLED 4 + ++#define NX_DRIVER_GET_STATE 51 ++ + #define NX_DRIVER_ERROR 90 + +