diff --git a/conf/mctpd.conf b/conf/mctpd.conf index 3b7f9097..d12e4b19 100644 --- a/conf/mctpd.conf +++ b/conf/mctpd.conf @@ -5,6 +5,9 @@ mode = "bus-owner" [mctp] message_timeout_ms = 30 +# Delay between each Get Routing Table message. +routing_table_polling_interval_ms = 1000 + # Specify a UUID: not generally required - mctpd will query the system UUID # where available. # uuid = "21f0f554-7f7c-4211-9ca1-6d0f000ea9e7" diff --git a/meson.build b/meson.build index 7975b7a7..5141b295 100644 --- a/meson.build +++ b/meson.build @@ -60,11 +60,13 @@ toml_dep = declare_dependency( executable('mctp', sources: ['src/mctp.c'] + netlink_sources + util_sources + ops_sources, install: true, + c_args: ['-DHAVE_LIBSYSTEMD=0'], ) mctp_test = executable('test-mctp', sources: ['src/mctp.c'] + netlink_sources + util_sources + test_ops_sources, include_directories: include_directories('src'), + c_args: ['-DHAVE_LIBSYSTEMD=0'], ) executable('mctp-req', @@ -92,6 +94,7 @@ if libsystemd.found() dependencies: [libsystemd, toml_dep], install: true, install_dir: get_option('sbindir'), + c_args: ['-DHAVE_LIBSYSTEMD=1'], ) mctpd_test = executable('test-mctpd', @@ -100,6 +103,7 @@ if libsystemd.found() ] + test_ops_sources + netlink_sources + util_sources, include_directories: include_directories('src'), dependencies: [libsystemd, toml_dep], + c_args: ['-DHAVE_LIBSYSTEMD=1'], ) endif diff --git a/src/mctp-control-spec.h b/src/mctp-control-spec.h index 493ac578..1e9d5b44 100644 --- a/src/mctp-control-spec.h +++ b/src/mctp-control-spec.h @@ -413,3 +413,11 @@ enum mctp_phys_binding { MCTP_PHYS_BINDING_UCIE = 0x09, MCTP_PHYS_BINDING_VENDOR = 0xFF, }; + +#define MCTP_GET_ROUTING_TABLE_MSG_PHYSICAL_ADDRESS(entryh) \ + ((void *)((char *)(entryh) + sizeof(struct get_routing_table_entry))) +#define MCTP_GET_ROUTING_TABLE_MSG_NEXT(entryh) \ + ((struct get_routing_table_entry \ + *)((char *)(entryh) + \ + sizeof(struct get_routing_table_entry) + \ + (entryh)->phys_address_size)) diff --git a/src/mctp-ops.c b/src/mctp-ops.c index 0088dd31..15baded0 100644 --- a/src/mctp-ops.c +++ b/src/mctp-ops.c @@ -9,6 +9,9 @@ #include #include +#if HAVE_LIBSYSTEMD +#include +#endif #include #include "mctp.h" @@ -74,6 +77,12 @@ const struct mctp_ops mctp_ops = { .recvfrom = mctp_op_recvfrom, .close = mctp_op_close, }, +#if HAVE_LIBSYSTEMD + .sd_event = { + .add_time_relative = sd_event_add_time_relative, + .source_set_time_relative = sd_event_source_set_time_relative, + }, +#endif .bug_warn = mctp_bug_warn, }; diff --git a/src/mctp-ops.h b/src/mctp-ops.h index 105072ba..8348742b 100644 --- a/src/mctp-ops.h +++ b/src/mctp-ops.h @@ -7,6 +7,7 @@ */ #pragma once +#include #include #include @@ -24,9 +25,23 @@ struct socket_ops { int (*close)(int sd); }; +struct sd_event; +struct sd_event_source; +struct sd_event_ops { + int (*add_time_relative)(struct sd_event *e, + struct sd_event_source **ret, clockid_t clock, + uint64_t usec, uint64_t accuracy, + int (*callback)(struct sd_event_source *s, + uint64_t usec, void *userdata), + void *userdata); + int (*source_set_time_relative)(struct sd_event_source *s, + uint64_t usec); +}; + struct mctp_ops { struct socket_ops mctp; struct socket_ops nl; + struct sd_event_ops sd_event; void (*bug_warn)(const char *fmt, va_list args); }; diff --git a/src/mctpd.c b/src/mctpd.c index 36a23727..3b1ace0f 100644 --- a/src/mctpd.c +++ b/src/mctpd.c @@ -126,7 +126,6 @@ enum discovery_state { }; struct link { - enum discovery_state discovered; bool published; int ifindex; enum endpoint_role role; @@ -135,6 +134,14 @@ struct link { sd_bus_slot *slot_iface; sd_bus_slot *slot_busowner; + struct { + enum discovery_state flag; + sd_event_source *notify_source; + dest_phys notify_dest; + uint64_t notify_retry_delay; + uint8_t notify_tries_left; + } discovery; + struct ctx *ctx; }; @@ -199,6 +206,10 @@ struct peer { // Pool size uint8_t pool_size; uint8_t pool_start; + + struct { + sd_event_source *source; + } routing_table_polling; }; struct msg_type_support { @@ -234,6 +245,9 @@ struct ctx { // Timeout in usecs for a MCTP response uint64_t mctp_timeout; + // Interval in usecs between routing table requests + uint64_t routing_table_polling_interval; + // Next IID to use uint8_t iid; @@ -269,6 +283,7 @@ static int publish_peer(struct peer *peer); static int unpublish_peer(struct peer *peer); static int peer_route_update(struct peer *peer, uint16_t type); static int peer_neigh_update(struct peer *peer, uint16_t type); +static int peer_routing_table_polling_enable(struct peer *peer); static int add_interface_local(struct ctx *ctx, int ifindex); static int del_interface(struct link *link); @@ -497,8 +512,9 @@ static int wait_fd_timeout(int fd, short events, uint64_t timeout_usec) if (rc < 0) goto out; - rc = sd_event_add_time_relative(ev, NULL, CLOCK_MONOTONIC, timeout_usec, - 0, cb_exit_loop_timeout, NULL); + rc = mctp_ops.sd_event.add_time_relative(ev, NULL, CLOCK_MONOTONIC, + timeout_usec, 0, + cb_exit_loop_timeout, NULL); if (rc < 0) goto out; @@ -804,8 +820,12 @@ static int handle_control_set_endpoint_id(struct ctx *ctx, int sd, warnx("ERR: cannot add bus owner to object lists"); } - if (link_data->discovered != DISCOVERY_UNSUPPORTED) { - link_data->discovered = DISCOVERY_DISCOVERED; + if (link_data->discovery.flag != DISCOVERY_UNSUPPORTED) { + link_data->discovery.flag = DISCOVERY_DISCOVERED; + } + rc = peer_routing_table_polling_enable(peer); + if (rc) { + warnx("failed to setup routing table polling for bus owner"); } resp->status = SET_MCTP_EID_ASSIGNMENT_STATUS(MCTP_SET_EID_ACCEPTED) | @@ -816,13 +836,13 @@ static int handle_control_set_endpoint_id(struct ctx *ctx, int sd, return reply_message(ctx, sd, resp, resp_len, addr); case MCTP_SET_EID_DISCOVERED: - if (link_data->discovered == DISCOVERY_UNSUPPORTED) { + if (link_data->discovery.flag == DISCOVERY_UNSUPPORTED) { resp->completion_code = MCTP_CTRL_CC_ERROR_INVALID_DATA; resp_len = sizeof(struct mctp_ctrl_resp); return reply_message(ctx, sd, resp, resp_len, addr); } - link_data->discovered = DISCOVERY_DISCOVERED; + link_data->discovery.flag = DISCOVERY_DISCOVERED; resp->status = SET_MCTP_EID_ASSIGNMENT_STATUS(MCTP_SET_EID_REJECTED) | SET_MCTP_EID_ALLOCATION_STATUS(MCTP_SET_EID_POOL_NONE); @@ -1060,7 +1080,7 @@ static int handle_control_prepare_endpoint_discovery( resp = (void *)resp; mctp_ctrl_msg_hdr_init_resp(&resp->ctrl_hdr, *req); - if (link_data->discovered == DISCOVERY_UNSUPPORTED) { + if (link_data->discovery.flag == DISCOVERY_UNSUPPORTED) { warnx("received prepare for discovery request to unsupported interface %d", addr->smctp_ifindex); resp->completion_code = MCTP_CTRL_CC_ERROR_UNSUPPORTED_CMD; @@ -1068,8 +1088,8 @@ static int handle_control_prepare_endpoint_discovery( sizeof(struct mctp_ctrl_resp), addr); } - if (link_data->discovered == DISCOVERY_DISCOVERED) { - link_data->discovered = DISCOVERY_UNDISCOVERED; + if (link_data->discovery.flag == DISCOVERY_DISCOVERED) { + link_data->discovery.flag = DISCOVERY_UNDISCOVERED; warnx("clear discovered flag of interface %d", addr->smctp_ifindex); } @@ -1104,13 +1124,13 @@ handle_control_endpoint_discovery(struct ctx *ctx, int sd, return 0; } - if (link_data->discovered == DISCOVERY_UNSUPPORTED) { + if (link_data->discovery.flag == DISCOVERY_UNSUPPORTED) { resp->completion_code = MCTP_CTRL_CC_ERROR_INVALID_DATA; return reply_message(ctx, sd, resp, sizeof(struct mctp_ctrl_resp), addr); } - if (link_data->discovered == DISCOVERY_DISCOVERED) { + if (link_data->discovery.flag == DISCOVERY_DISCOVERED) { // if we are already discovered (i.e, assigned an EID), then no reply return 0; } @@ -1901,6 +1921,8 @@ static int remove_peer(struct peer *peer) sd_event_source_unref(peer->recovery.source); } + sd_event_source_disable_unref(peer->routing_table_polling.source); + n->peers[peer->eid] = NULL; free(peer->message_types); free(peer->uuid); @@ -2283,6 +2305,62 @@ static int query_get_endpoint_id(struct ctx *ctx, const dest_phys *dest, return rc; } +static int query_get_routing_table(struct ctx *ctx, struct peer *peer, + uint8_t handle, + struct get_routing_table_entry **entries, + size_t *entries_count, uint8_t *next_handle) +{ + struct sockaddr_mctp_ext addr; + struct mctp_ctrl_cmd_get_routing_table req = { 0 }; + struct mctp_ctrl_resp_get_routing_table *resp = NULL; + uint8_t *buf = NULL; + size_t buf_size; + uint8_t iid; + int rc; + + iid = mctp_next_iid(ctx); + + req.ctrl_hdr.rq_dgram_inst = RQDI_REQ | iid; + req.ctrl_hdr.command_code = MCTP_CTRL_CMD_GET_ROUTING_TABLE_ENTRIES; + + req.entry_handle = handle; + + rc = endpoint_query_peer(peer, MCTP_CTRL_HDR_MSG_TYPE, &req, + sizeof(req), &buf, &buf_size, &addr); + if (rc < 0) + goto out; + + rc = mctp_ctrl_validate_response( + buf, buf_size, sizeof(*resp), peer_tostr_short(peer), iid, + MCTP_CTRL_CMD_GET_ROUTING_TABLE_ENTRIES); + if (rc) + goto out; + + resp = (void *)buf; + + *next_handle = resp->next_entry_handle; + *entries_count = resp->number_of_entries; + if (*entries_count == 0) { + *entries = NULL; + goto out; + } + + *entries = malloc(resp->number_of_entries * + sizeof(struct get_routing_table_entry) + + 1024); + if (*entries == NULL) { + rc = -ENOMEM; + goto out; + } + + memcpy(*entries, resp + 1, + resp->number_of_entries * + sizeof(struct get_routing_table_entry)); +out: + free(buf); + return rc; +} + /* Returns 0, and ret_peer associated with the endpoint. * Returns 0, ret_peer=NULL if the endpoint successfully replies "not yet assigned". * Returns negative error code on failure. @@ -3239,8 +3317,8 @@ static int peer_endpoint_recover(sd_event_source *s, uint64_t usec, reschedule: if (peer->recovery.npolls > 0) { - rc = sd_event_source_set_time_relative(peer->recovery.source, - peer->recovery.delay); + rc = mctp_ops.sd_event.source_set_time_relative( + peer->recovery.source, peer->recovery.delay); if (rc >= 0) { rc = sd_event_source_set_enabled(peer->recovery.source, SD_EVENT_ONESHOT); @@ -3275,7 +3353,7 @@ static int method_endpoint_recover(sd_bus_message *call, void *data, peer->recovery.npolls = MCTP_I2C_TSYM_MN1_MIN + 1; peer->recovery.delay = (MCTP_I2C_TSYM_TRECLAIM_MIN_US / 2) - ctx->mctp_timeout; - rc = sd_event_add_time_relative( + rc = mctp_ops.sd_event.add_time_relative( ctx->event, &peer->recovery.source, CLOCK_MONOTONIC, 0, ctx->mctp_timeout, peer_endpoint_recover, peer); if (rc < 0) { @@ -3339,6 +3417,107 @@ static int method_endpoint_set_mtu(sd_bus_message *call, void *data, return rc; } +static int peer_routing_table_polling_callback(sd_event_source *source, + uint64_t time, void *userdata) +{ + struct peer *peer = userdata; + struct peer *remote_peer = NULL; + struct get_routing_table_entry *entry = NULL; + size_t entries_count = 0; + uint8_t handle = 0x00; + size_t i; + int rc; + + assert(peer->routing_table_polling.source == source); + + while (handle != 0xFF) { + rc = query_get_routing_table(peer->ctx, peer, handle, &entry, + &entries_count, &handle); + if (rc < 0) { + warnx("failed to fetch routing table from peer %s", + peer_tostr(peer)); + return 0; + } + dfree(entry); + + for (i = 0; i < entries_count; + i++, entry = MCTP_GET_ROUTING_TABLE_MSG_NEXT(entry)) { + // Add Bridge/Endpoint to routing table + + switch (GET_ROUTING_ENTRY_TYPE(entry->entry_type)) { + case MCTP_ROUTING_ENTRY_ENDPOINT: + case MCTP_ROUTING_ENTRY_BRIDGE: + case MCTP_ROUTING_ENTRY_BRIDGE_AND_ENDPOINTS: + rc = add_peer(peer->ctx, &peer->phys, + entry->starting_eid, peer->net, + &remote_peer, true); + if (rc == -EEXIST) { + continue; + } else if (rc < 0) { + warnx("failed to add new peer: %s", + strerror(-rc)); + continue; + } + + rc = setup_added_peer(remote_peer); + if (rc < 0) { + warnx("failed to set up new peer: %s", + strerror(-rc)); + continue; + } + + // TODO: port? + + break; + }; + + // For bridge, enable routing table polling recursively + + switch (GET_ROUTING_ENTRY_TYPE(entry->entry_type)) { + case MCTP_ROUTING_ENTRY_BRIDGE: + case MCTP_ROUTING_ENTRY_BRIDGE_AND_ENDPOINTS: + rc = peer_routing_table_polling_enable( + remote_peer); + if (rc < 0) { + warnx("failed to enable routing table polling on bridge: %s", + strerror(-rc)); + continue; + } + + break; + } + } + } + + // rearm timer + rc = mctp_ops.sd_event.source_set_time_relative( + source, peer->ctx->routing_table_polling_interval); + if (rc) { + warn("failed to rearm timer"); + } + + return 0; +} + +static int peer_routing_table_polling_enable(struct peer *peer) +{ + int rc = 0; + + if (peer->routing_table_polling.source != NULL) { + return 0; + } + + rc = mctp_ops.sd_event.add_time_relative( + peer->ctx->event, &peer->routing_table_polling.source, + CLOCK_MONOTONIC, peer->ctx->routing_table_polling_interval, 0, + peer_routing_table_polling_callback, peer); + + rc = sd_event_source_set_enabled(peer->routing_table_polling.source, + SD_EVENT_ON); + + return rc; +} + static int method_net_learn_endpoint(sd_bus_message *call, void *data, sd_bus_error *berr) { @@ -3658,6 +3837,88 @@ static int bus_link_get_prop(sd_bus *bus, const char *path, return rc; } +static int query_discovery_notify(struct link *link) +{ + struct mctp_ctrl_cmd_discovery_notify req = { 0 }; + struct mctp_ctrl_resp_discovery_notify *resp; + struct sockaddr_mctp_ext resp_addr; + size_t buf_size; + uint8_t *buf; + int rc; + + mctp_ctrl_msg_hdr_init_req(&req.ctrl_hdr, mctp_next_iid(link->ctx), + MCTP_CTRL_CMD_DISCOVERY_NOTIFY); + + rc = endpoint_query_phys(link->ctx, &link->discovery.notify_dest, + MCTP_CTRL_HDR_MSG_TYPE, &req, sizeof(req), + &buf, &buf_size, &resp_addr); + if (rc < 0) + goto free_buf; + + if (buf_size != sizeof(*resp)) { + warnx("%s: wrong reply length %zu bytes. dest %s", __func__, + buf_size, dest_phys_tostr(&link->discovery.notify_dest)); + rc = -ENOMSG; + goto free_buf; + } + + resp = (void *)buf; + if (resp->completion_code != 0) { + warnx("Failure completion code 0x%02x from %s", + resp->completion_code, + dest_phys_tostr(&link->discovery.notify_dest)); + rc = -ECONNREFUSED; + goto free_buf; + } + +free_buf: + free(buf); + return rc; +} + +static int link_discovery_notify_callback(sd_event_source *source, + uint64_t time, void *userdata) +{ + struct link *link = userdata; + struct ctx *ctx = link->ctx; + int rc; + + // sanity check + assert(link->discovery.notify_source == source); + + // Discovery notify succeeded + if (link->discovery.flag == DISCOVERY_DISCOVERED) + goto disarm; + + rc = query_discovery_notify(link); + if (rc < 0) { + if (ctx->verbose) { + warnx("failed to send discovery notify at retry %d: %s", + link->discovery.notify_tries_left, strerror(-rc)); + } + } + + link->discovery.notify_tries_left -= 1; + if (link->discovery.notify_tries_left == 0) { + warnx("failed to send discovery notify after all retries"); + goto disarm; + } + + rc = mctp_ops.sd_event.source_set_time_relative( + source, link->discovery.notify_retry_delay); + if (rc < 0) { + warnx("failed to rearm discovery notify timer"); + goto disarm; + } + + return 0; + +disarm: + sd_event_source_disable_unref(source); + link->discovery.notify_source = NULL; + return 0; +} + static int bus_link_set_prop(sd_bus *bus, const char *path, const char *interface, const char *property, sd_bus_message *value, void *userdata, @@ -4495,7 +4756,7 @@ static int add_interface(struct ctx *ctx, int ifindex) if (!link) return -ENOMEM; - link->discovered = DISCOVERY_UNSUPPORTED; + link->discovery.flag = DISCOVERY_UNSUPPORTED; link->published = false; link->ifindex = ifindex; link->ctx = ctx; @@ -4525,7 +4786,42 @@ static int add_interface(struct ctx *ctx, int ifindex) } if (phys_binding == MCTP_PHYS_BINDING_PCIE_VDM) { - link->discovered = DISCOVERY_UNDISCOVERED; + link->discovery.flag = DISCOVERY_UNDISCOVERED; + // TODO: These numbers are respectively MN1 and MT4, specified in DSP0239 + // control message timing. + // + // Might need to extract these to macros like MCTP_I2C_TSYM_* in this file, + // or a commit to actually centralize those timing at one place, now that + // we have support for detecting link binding type. + link->discovery.notify_tries_left = 3; + link->discovery.notify_retry_delay = 5000000; + + // For PCIe-VDM, we want an all zeroes address for Route-to-Root-Complex. + rc = mctp_nl_hwaddr_len_byindex( + ctx->nl, ifindex, + &link->discovery.notify_dest.hwaddr_len); + if (rc < 0) { + warnx("Can't find hwaddr_len by index %d", ifindex); + return -ENOENT; + } + + memset(link->discovery.notify_dest.hwaddr, 0, + link->discovery.notify_dest.hwaddr_len); + link->discovery.notify_dest.ifindex = ifindex; + + rc = mctp_ops.sd_event.add_time_relative( + ctx->event, &link->discovery.notify_source, + CLOCK_MONOTONIC, 0, 0, link_discovery_notify_callback, + link); + if (rc >= 0) { + rc = sd_event_source_set_enabled( + link->discovery.notify_source, SD_EVENT_ON); + } + if (rc < 0) { + warnx("Failed to arm discovery notify timer"); + sd_event_source_disable_unref( + link->discovery.notify_source); + } } link->published = true; @@ -4673,6 +4969,16 @@ static int parse_config_mctp(struct ctx *ctx, toml_table_t *mctp_tab) ctx->mctp_timeout = i * 1000; } + val = toml_int_in(mctp_tab, "routing_table_polling_interval_ms"); + if (val.ok) { + int64_t i = val.u.i; + if (i <= 0 || i > 100 * 1000) { + warnx("invalid routing_table_polling_interval_ms value"); + return -1; + } + ctx->routing_table_polling_interval = i * 1000; + } + val = toml_string_in(mctp_tab, "uuid"); if (val.ok) { rc = sd_id128_from_string(val.u.s, (void *)&ctx->uuid); @@ -4849,6 +5155,7 @@ static void setup_config_defaults(struct ctx *ctx) { ctx->mctp_timeout = 250000; // 250ms ctx->default_role = ENDPOINT_ROLE_BUS_OWNER; + ctx->routing_table_polling_interval = 1000000; // 1s ctx->max_pool_size = 15; ctx->dyn_eid_min = eid_alloc_min; ctx->dyn_eid_max = eid_alloc_max; diff --git a/tests/conftest.py b/tests/conftest.py index b87dc2b1..7fca8e81 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,6 +3,7 @@ import pytest import asyncdbus +import trio.testing import mctpenv @@ -35,3 +36,10 @@ async def mctpd(nursery, dbus, sysnet, config): @pytest.fixture async def mctp(nursery, sysnet): return mctpenv.MctpWrapper(nursery, sysnet) + +@pytest.fixture +def autojump_clock(): + """ + Custom autojump clock with a reasonable threshold for non-time I/O waits + """ + return trio.testing.MockClock(autojump_threshold=0.01) diff --git a/tests/mctp-ops-test.c b/tests/mctp-ops-test.c index 7b63cd4e..df3927da 100644 --- a/tests/mctp-ops-test.c +++ b/tests/mctp-ops-test.c @@ -7,6 +7,7 @@ #define _GNU_SOURCE +#include #include #include #include @@ -18,6 +19,9 @@ #include #include +#if HAVE_LIBSYSTEMD +#include +#endif #include #include "mctp-ops.h" @@ -38,10 +42,12 @@ static int mctp_op_socket(int type) struct iovec iov; int rc, var, sd; - if (type == AF_MCTP) + if (type == CONTROL_OP_SOCKET_MCTP) req.type = CONTROL_OP_SOCKET_MCTP; - else if (type == AF_NETLINK) + else if (type == CONTROL_OP_SOCKET_NL) req.type = CONTROL_OP_SOCKET_NL; + else if (type == CONTROL_OP_TIMER) + req.type = CONTROL_OP_TIMER; else errx(EXIT_FAILURE, "invalid socket type?"); @@ -72,12 +78,12 @@ static int mctp_op_socket(int type) static int mctp_op_mctp_socket(void) { - return mctp_op_socket(AF_MCTP); + return mctp_op_socket(CONTROL_OP_SOCKET_MCTP); } static int mctp_op_netlink_socket(void) { - return mctp_op_socket(AF_NETLINK); + return mctp_op_socket(CONTROL_OP_SOCKET_NL); } static int mctp_op_bind(int sd, struct sockaddr *addr, socklen_t addrlen) @@ -221,6 +227,115 @@ static void mctp_bug_warn(const char *fmt, va_list args) abort(); } +#if HAVE_LIBSYSTEMD +struct wrapped_time_userdata { + sd_event_time_handler_t callback; + void *userdata; +}; + +int wrapped_time_callback(sd_event_source *source, int fd, uint revents, + void *userdata) +{ + struct wrapped_time_userdata *wrapud = userdata; + uint64_t usec; + ssize_t rc; + + rc = read(fd, &usec, sizeof(usec)); + if (rc != 8) + errx(EXIT_FAILURE, "ops protocol error"); + + rc = wrapud->callback(source, usec, wrapud->userdata); + warnx("%ld", rc); + + return 0; +} + +void wrapped_time_destroy(void *wrapud) +{ + free(wrapud); +} + +static int mctp_op_sd_event_add_time_relative( + sd_event *e, sd_event_source **ret, clockid_t clock, uint64_t usec, + uint64_t accuracy, sd_event_time_handler_t callback, void *userdata) +{ + struct wrapped_time_userdata *wrapud = NULL; + sd_event_source *source = NULL; + int sd = -1; + int rc = 0; + + sd = mctp_op_socket(CONTROL_OP_TIMER); + if (sd < 0) + return -errno; + + rc = write(sd, &usec, sizeof(usec)); + if (rc != 8) + errx(EXIT_FAILURE, "ops protocol error"); + + wrapud = malloc(sizeof(*wrapud)); + if (!wrapud) { + rc = -ENOMEM; + goto fail; + } + + wrapud->callback = callback; + wrapud->userdata = userdata; + + rc = sd_event_add_io(e, &source, sd, EPOLLIN, wrapped_time_callback, + wrapud); + if (rc < 0) + goto fail; + + rc = sd_event_source_set_destroy_callback(source, wrapped_time_destroy); + if (rc < 0) + goto fail; + + wrapud = NULL; + + rc = sd_event_source_set_io_fd_own(source, 1); + if (rc < 0) + goto fail; + + sd = -1; + + rc = sd_event_source_set_enabled(source, SD_EVENT_ONESHOT); + if (rc < 0) + goto fail; + + if (!ret) { + rc = sd_event_source_set_floating(source, 1); + if (rc < 0) + goto fail; + + sd_event_source_unref(source); + } else { + *ret = source; + } + + return 0; + +fail: + if (sd > 0) + close(sd); + free(wrapud); + sd_event_source_disable_unref(*ret); + return rc; +} + +static int mctp_op_sd_event_source_set_time_relative(sd_event_source *s, + uint64_t usec) +{ + int sd = sd_event_source_get_io_fd(s); + ssize_t rc; + + rc = write(sd, &usec, sizeof(usec)); + if (rc != 8) + errx(EXIT_FAILURE, "ops protocol error"); + + return 0; +} +#endif + const struct mctp_ops mctp_ops = { .mctp = { .socket = mctp_op_mctp_socket, @@ -238,6 +353,12 @@ const struct mctp_ops mctp_ops = { .recvfrom = mctp_op_recvfrom, .close = mctp_op_close, }, +#if HAVE_LIBSYSTEMD + .sd_event = { + .add_time_relative = mctp_op_sd_event_add_time_relative, + .source_set_time_relative = mctp_op_sd_event_source_set_time_relative, + }, +#endif .bug_warn = mctp_bug_warn, }; diff --git a/tests/mctpenv/__init__.py b/tests/mctpenv/__init__.py index d1fc6714..9b4ad990 100644 --- a/tests/mctpenv/__init__.py +++ b/tests/mctpenv/__init__.py @@ -2,6 +2,7 @@ import array import enum import errno +import math import os import signal import socket @@ -1113,6 +1114,32 @@ async def notify_delroute(self, route): await self._notify_route(route, rtnl.RTM_DELROUTE); +class TimerSocket(BaseSocket): + def __init__(self, sock): + super().__init__(sock) + self.delay = sys.maxsize + + async def run(self): + while True: + try: + with trio.move_on_after(self.delay / 1000000) as scope: + # now = math.floor(trio.current_time() * 1000000) + # await self.sock.send(struct.pack('@Q', now)) + data = await self.sock.recv(8) + if len(data) == 0: + break + + (next_delay,) = struct.unpack('@Q', data) + self.delay = next_delay + + # timed out + if scope.cancelled_caught: + await self.sock.send(struct.pack('@Q', math.floor(trio.current_time() * 1000000))) + self.delay = sys.maxsize + except (ConnectionResetError, BrokenPipeError) as ex: + break + + async def send_fd(sock, fd): fdarray = array.array("i", [fd]) await sock.sendmsg([b'x'], [ @@ -1158,6 +1185,14 @@ async def handle_control(self, nursery): remote.close() nursery.start_soon(nl.run) + elif op == 0x03: + # Timer socket + (local, remote) = self.socketpair() + sd = TimerSocket(local) + await send_fd(self.sock_local, remote.fileno()) + remote.close() + nursery.start_soon(sd.run) + else: print(f"unknown op {op}") diff --git a/tests/test-proto.h b/tests/test-proto.h index db3e8454..89c2f4d9 100644 --- a/tests/test-proto.h +++ b/tests/test-proto.h @@ -9,6 +9,7 @@ enum { CONTROL_OP_INIT, CONTROL_OP_SOCKET_MCTP, CONTROL_OP_SOCKET_NL, + CONTROL_OP_TIMER, }; struct control_msg_req { diff --git a/tests/test_mctpd.py b/tests/test_mctpd.py index ffb66944..8113df36 100644 --- a/tests/test_mctpd.py +++ b/tests/test_mctpd.py @@ -50,6 +50,7 @@ async def _introspect_path_recursive(dbus, path, node_set): return dups + """ Test that the dbus object tree is sensible: we can introspect all objects, and that there are no duplicates """ @@ -187,7 +188,7 @@ def ep_connectivity_changed(iface, changed, invalidated): # to transition 'Connectivity' to 'Available', which is a test failure. assert not expected.cancelled_caught -async def test_recover_endpoint_removed(dbus, mctpd): +async def test_recover_endpoint_removed(dbus, mctpd, autojump_clock): iface = mctpd.system.interfaces[0] dev = mctpd.network.endpoints[0] mctp = await dbus.get_proxy_object(MCTPD_C, MCTPD_MCTP_P) @@ -224,7 +225,7 @@ def ep_removed(ep_path, interfaces): assert not expected.cancelled_caught -async def test_recover_endpoint_reset(dbus, mctpd): +async def test_recover_endpoint_reset(dbus, mctpd, autojump_clock): iface = mctpd.system.interfaces[0] dev = mctpd.network.endpoints[0] mctp = await dbus.get_proxy_object(MCTPD_C, MCTPD_MCTP_P) @@ -260,7 +261,7 @@ def ep_connectivity_changed(iface, changed, invalidated): assert not expected.cancelled_caught -async def test_recover_endpoint_exchange(dbus, mctpd): +async def test_recover_endpoint_exchange(dbus, mctpd, autojump_clock): iface = mctpd.system.interfaces[0] dev = mctpd.network.endpoints[0] mctp = await dbus.get_proxy_object(MCTPD_C, MCTPD_MCTP_P) @@ -628,7 +629,7 @@ async def test_network_local_eids_none(dbus, mctpd): assert eids == [] -async def test_concurrent_recovery_setup(dbus, mctpd): +async def test_concurrent_recovery_setup(dbus, mctpd, autojump_clock): iface = mctpd.system.interfaces[0] mctp_i = await mctpd_mctp_iface_obj(dbus, iface) diff --git a/tests/test_mctpd_endpoint.py b/tests/test_mctpd_endpoint.py index 82ecf28c..6e6ecd4d 100644 --- a/tests/test_mctpd_endpoint.py +++ b/tests/test_mctpd_endpoint.py @@ -22,11 +22,16 @@ async def iface(): @pytest.fixture -async def sysnet(iface): +async def bo(iface): + return Endpoint(iface, bytes([0x10]), eid=8) + + +@pytest.fixture +async def sysnet(iface, bo): system = System() await system.add_interface(iface) network = Network() - network.add_endpoint(Endpoint(iface, bytes([0x10]), eid=8)) + network.add_endpoint(bo) return Sysnet(system, network) @@ -113,12 +118,100 @@ class TestDiscovery: async def iface(self): return System.Interface("mctp0", 1, 1, bytes([0x1D]), 68, 254, True, PhysicalBinding.PCIE_VDM) + @pytest.fixture + async def bo(self, iface): + return TestDiscovery.BusOwnerEndpoint(iface, bytes([0x00]), eid=8) + + + class BusOwnerEndpoint(Endpoint): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.sem = trio.Semaphore(initial_value=0) + + async def handle_mctp_control(self, sock, addr, data): + print(addr, data) + flags, opcode = data[0:2] + if opcode != 0x0D: + return await super().handle_mctp_control(sock, addr, data) + dst_addr = MCTPSockAddr.for_ep_resp(self, addr, sock.addr_ext) + await sock.send(dst_addr, bytes([flags & 0x1F, opcode, 0x00])) + self.sem.release() + + """ Test simple Discovery sequence """ async def test_simple_discovery_sequence(self, dbus, mctpd): bo = mctpd.network.endpoints[0] assert len(mctpd.system.addresses) == 0 + # BMC should send a Discovery Notify message + with trio.move_on_after(5) as expected: + await bo.sem.acquire() + assert not expected.cancelled_caught + + # no EID yet + rsp = await bo.send_control(mctpd.network.mctp_socket, MCTPControlCommand(True, 0, 0x02)) + assert rsp.hex(' ') == '00 02 00 00 02 00' + + # BMC response to Prepare for Discovery + rsp = await bo.send_control(mctpd.network.mctp_socket, MCTPControlCommand(True, 0, 0x0B)) + assert rsp.hex(' ') == '00 0b 00' + + # BMC response to Endpoint Discovery + rsp = await bo.send_control(mctpd.network.mctp_socket, MCTPControlCommand(True, 0, 0x0C)) + assert rsp.hex(' ') == '00 0c 00' + + # set EID = 42 + eid = 42 + rsp = await bo.send_control(mctpd.network.mctp_socket, MCTPControlCommand(True, 0, 0x01, bytes([0x00, eid]))) + assert rsp.hex(' ') == f'00 01 00 00 {eid:02x} 00' + + # BMC should contains two object paths: bus owner and itself + assert await mctpd_mctp_endpoint_control_obj(dbus, f"/au/com/codeconstruct/mctp1/networks/1/endpoints/{bo.eid}") + assert await mctpd_mctp_endpoint_control_obj(dbus, f"/au/com/codeconstruct/mctp1/networks/1/endpoints/{eid}") + + +class TestDiscoveryRetry: + @pytest.fixture + async def iface(self): + return System.Interface("mctp0", 1, 1, bytes([0x1D]), 68, 254, True, PhysicalBinding.PCIE_VDM) + + @pytest.fixture + async def bo(self, iface): + return TestDiscoveryRetry.BusOwnerEndpoint(iface, bytes([0x00]), eid=8) + + + class BusOwnerEndpoint(Endpoint): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.sem = trio.Semaphore(initial_value=0) + self.retry_left = 1 + + async def handle_mctp_control(self, sock, src_addr, msg): + flags, opcode = msg[0:2] + if opcode != 0x0D: + return await super().handle_mctp_control(sock, src_addr, msg) + + # only reply after 2 retries + if self.retry_left == 0: + dst_addr = MCTPSockAddr.for_ep_resp(self, src_addr, sock.addr_ext) + await sock.send(dst_addr, bytes([flags & 0x1F, opcode, 0x00])) + self.sem.release() + else: + self.retry_left -= 1 + + + """ Test simple Discovery sequence """ + async def test_discovery_after_one_retry(self, dbus, mctpd, autojump_clock): + bo = mctpd.network.endpoints[0] + + assert len(mctpd.system.addresses) == 0 + + # BMC should send a Discovery Notify message + with trio.move_on_after(10) as expected: + await bo.sem.acquire() + assert not expected.cancelled_caught + # no EID yet rsp = await bo.send_control(mctpd.network.mctp_socket, MCTPControlCommand(True, 0, 0x02)) assert rsp.hex(' ') == '00 02 00 00 02 00' @@ -153,3 +246,288 @@ async def test_simple(self, dbus, mctpd): # BMC response ERROR_UNSUPPORTED_CMD to Prepare for Discovery rsp = await bo.send_control(mctpd.network.mctp_socket, MCTPControlCommand(True, 0, 0x0B)) assert rsp.hex(' ') == '00 0b 05' + + +class TestGetEmptyRoutingTable: + @pytest.fixture + async def bo(self, iface): + return TestGetEmptyRoutingTable.BusOwnerEndpoint(iface, bytes([0x00]), eid=8) + + + class BusOwnerEndpoint(Endpoint): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.sem = trio.Semaphore(initial_value=0) + + async def handle_mctp_control(self, sock, addr, data): + flags, opcode = data[0:2] + if opcode != 0x0A: + return await super().handle_mctp_control(sock, addr, data) + assert len(data) == 3 + dst_addr = MCTPSockAddr.for_ep_resp(self, addr, sock.addr_ext) + await sock.send(dst_addr, bytes([flags & 0x1F, opcode, 0x00, 0xFF, + 0x02, # len + 0x01, 8, 0b10_0_00000, 0x00, 0x00, 0x01, 0x10, + 0x01, 9, 0b00_0_00000, 0x00, 0x00, 0x01, 0x11])) + self.sem.release() + + async def test(self, dbus, mctpd, autojump_clock): + bo = mctpd.network.endpoints[0] + + # trigger get routing table via set eid + await bo.send_control(mctpd.network.mctp_socket, MCTPControlCommand(True, 0, 0x01, bytes([0x00, 0x09]))) + + with trio.move_on_after(5) as expected: + await bo.sem.acquire() + + assert not expected.cancelled_caught + + +class TestGetThreeEntriesRoutingTable: + @pytest.fixture + async def bo(self, iface): + return TestGetThreeEntriesRoutingTable.BusOwnerEndpoint(iface, bytes([0x00]), eid=8) + + + class BusOwnerEndpoint(Endpoint): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.sem = trio.Semaphore(initial_value=0) + + async def handle_mctp_control(self, sock, addr, data): + flags, opcode = data[0:2] + if opcode != 0x0A: + return await super().handle_mctp_control(sock, addr, data) + assert len(data) == 3 + dst_addr = MCTPSockAddr.for_ep_resp(self, addr, sock.addr_ext) + await sock.send(dst_addr, bytes([flags & 0x1F, opcode, 0x00, 0xFF, + 0x03, # len + 0x01, 8, 0b10_0_00000, 0x00, 0x00, 0x01, 0x10, + 0x01, 9, 0b00_0_00000, 0x00, 0x00, 0x01, 0x11, + 0x01, 66, 0b00_0_00000, 0x00, 0x00, 0x01, 0x12])) + self.sem.release() + + async def test(self, dbus, mctpd, autojump_clock): + bo = mctpd.network.endpoints[0] + + # trigger get routing table via set eid + await bo.send_control(mctpd.network.mctp_socket, MCTPControlCommand(True, 0, 0x01, bytes([0x00, 0x09]))) + + with trio.move_on_after(5) as expected: + await bo.sem.acquire() + + assert not expected.cancelled_caught + + assert await mctpd_mctp_endpoint_control_obj(dbus, "/au/com/codeconstruct/mctp1/networks/1/endpoints/66") + + + + +class TestGetNestedRoutingTables: + """ + Test sending nested routing table. + + This is the topology (we are eid=9): + + ┌───────┐ ┌───────┐ + │ ┼──► EID 9 │ + │ │ └───────┘ + │ │ + │ │ + │ EID 8 │ + │ │ ┌────────┐ ┌──────┐ + │ │ │ ┼──►EID 11│ + │ │ │ │ └──────┘ + │ ┼──► EID 10 │ + │ │ │ │ ┌──────┐ + │ │ │ ├──►EID 12│ + └───────┘ └────────┘ └──────┘ + """ + + # Stub for Bus Owner with eid=8 + class FirstBusOwnerEndpoint(Endpoint): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.sem = trio.Semaphore(initial_value=0) + + async def handle_mctp_control(self, sock, addr, data): + flags, opcode = data[0:2] + if opcode != 0x0A: + return await super().handle_mctp_control(sock, addr, data) + assert len(data) == 3 + dst_addr = MCTPSockAddr.for_ep_resp(self, addr, sock.addr_ext) + await sock.send(dst_addr, bytes([flags & 0x1F, opcode, 0x00, 0xFF, + 0x03, # len + 0x01, 8, 0b10_0_00000, 0x00, 0x00, 0x01, 0x10, + 0x01, 9, 0b00_0_00000, 0x00, 0x00, 0x01, 0x11, + 0x03, 10, 0b01_0_00000, 0x00, 0x00, 0x01, 0x12])) + self.sem.release() + + # Stub for Bus Owner with eid=10 + class SecondBusOwnerEndpoint(Endpoint): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.sem = trio.Semaphore(initial_value=0) + + async def handle_mctp_control(self, sock, addr, data): + flags, opcode = data[0:2] + if opcode != 0x0A: + return await super().handle_mctp_control(sock, addr, data) + assert len(data) == 3 + dst_addr = MCTPSockAddr.for_ep_resp(self, addr, sock.addr_ext) + await sock.send(dst_addr, bytes([flags & 0x1F, opcode, 0x00, 0xFF, + 0x05, # len + 0x10, 10, 0b10_0_00000, 0x00, 0x00, 0x01, 0x12, + 0x10, 9, 0b00_0_00000, 0x00, 0x00, 0x01, 0x11, + 0x10, 8, 0b10_0_00000, 0x00, 0x00, 0x01, 0x10, + 0x01, 11, 0b00_0_00001, 0x00, 0x00, 0x01, 0x13, + 0x01, 12, 0b00_0_00001, 0x00, 0x00, 0x01, 0x14])) + self.sem.release() + + @pytest.fixture + async def bo(self, iface): + return TestGetNestedRoutingTables.FirstBusOwnerEndpoint(iface, bytes([0x00]), eid=8) + + async def test(self, dbus, mctpd, autojump_clock): + bo1 = mctpd.network.endpoints[0] + bo2 = TestGetNestedRoutingTables.SecondBusOwnerEndpoint(mctpd.system.interfaces[0], bytes([0x12]), eid=10) + mctpd.network.add_endpoint(bo2) + + ep1 = Endpoint(iface, bytes([0x13]), eid=11) + ep2 = Endpoint(iface, bytes([0x14]), eid=12) + + bo1.add_bridged_ep(bo2) + bo2.add_bridged_ep(ep1) + bo2.add_bridged_ep(ep2) + + + # trigger get routing table via set eid + await bo1.send_control(mctpd.network.mctp_socket, MCTPControlCommand(True, 0, 0x01, bytes([0x00, 0x09]))) + + with trio.move_on_after(5) as expected: + await bo1.sem.acquire() + await bo2.sem.acquire() + + assert not expected.cancelled_caught + + await trio.sleep(1) + + assert await mctpd_mctp_endpoint_control_obj(dbus, "/au/com/codeconstruct/mctp1/networks/1/endpoints/8") + assert await mctpd_mctp_endpoint_control_obj(dbus, "/au/com/codeconstruct/mctp1/networks/1/endpoints/9") + assert await mctpd_mctp_endpoint_control_obj(dbus, "/au/com/codeconstruct/mctp1/networks/1/endpoints/10") + assert await mctpd_mctp_endpoint_control_obj(dbus, "/au/com/codeconstruct/mctp1/networks/1/endpoints/11") + assert await mctpd_mctp_endpoint_control_obj(dbus, "/au/com/codeconstruct/mctp1/networks/1/endpoints/12") + + +class TestGetMultipleRoutingTableHandles: + @pytest.fixture + async def bo(self, iface): + return TestGetMultipleRoutingTableHandles.BusOwnerEndpoint(iface, bytes([0x00]), eid=8) + + + class BusOwnerEndpoint(Endpoint): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.sem = trio.Semaphore(initial_value=0) + + async def handle_mctp_control(self, sock, addr, data): + flags, opcode = data[0:2] + if opcode != 0x0A: + return await super().handle_mctp_control(sock, addr, data) + assert len(data) == 3 + dst_addr = MCTPSockAddr.for_ep_resp(self, addr, sock.addr_ext) + + if data[2] == 0x00: + await sock.send(dst_addr, bytes([flags & 0x1F, opcode, 0x00, 0x01, + 0x02, # len + 0x01, 8, 0b10_0_00000, 0x00, 0x00, 0x01, 0x10, + 0x01, 9, 0b00_0_00000, 0x00, 0x00, 0x01, 0x11])) + return + + if data[2] == 0x01: + await sock.send(dst_addr, bytes([flags & 0x1F, opcode, 0x00, 0xFF, + 0x01, # len + 0x01, 66, 0b00_0_00000, 0x00, 0x00, 0x01, 0x12])) + self.sem.release() + return + + assert False + + async def test(self, dbus, mctpd, autojump_clock): + bo = mctpd.network.endpoints[0] + + # trigger get routing table via set eid + await bo.send_control(mctpd.network.mctp_socket, MCTPControlCommand(True, 0, 0x01, bytes([0x00, 0x09]))) + + await trio.sleep(10) + + assert await mctpd_mctp_endpoint_control_obj(dbus, "/au/com/codeconstruct/mctp1/networks/1/endpoints/66") + + +class TestResetRoutingTableOnSetEid: + @pytest.fixture + async def bo(self, iface): + return TestGetMultipleRoutingTableHandles.BusOwnerEndpoint(iface, bytes([0x00]), eid=8) + + + class BusOwnerEndpoint(Endpoint): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.sem = trio.Semaphore(initial_value=0) + self.network_is_down = False + + async def handle_mctp_control(self, sock, addr, data): + flags, opcode = data[0:2] + if opcode != 0x0A: + return await super().handle_mctp_control(sock, addr, data) + assert len(data) == 3 + dst_addr = MCTPSockAddr.for_ep_resp(self, addr, sock.addr_ext) + + if self.network_is_down: + await sock.send(dst_addr, bytes([flags & 0x1F, opcode, 0x00, 0x01, + 0x02, # len + 0x01, 8, 0b10_0_00000, 0x00, 0x00, 0x01, 0x10, + 0x01, 9, 0b00_0_00000, 0x00, 0x00, 0x01, 0x11])) + else: + await sock.send(dst_addr, bytes([flags & 0x1F, opcode, 0x00, 0xFF, + 0x03, # len + 0x01, 8, 0b10_0_00000, 0x00, 0x00, 0x01, 0x10, + 0x01, 9, 0b00_0_00000, 0x00, 0x00, 0x01, 0x11, + 0x01, 66, 0b00_0_00000, 0x00, 0x00, 0x01, 0x12])) + self.sem.release() + + async def test(self, dbus, mctpd, autojump_clock): + bo = mctpd.network.endpoints[0] + + # set our eid=09 + await bo.send_control(mctpd.network.mctp_socket, MCTPControlCommand(True, 0, 0x01, bytes([0x00, 0x09]))) + + with trio.move_on_after(5) as expected: + await bo.sem.acquire() + + assert not expected.cancelled_caught + + await trio.sleep(5) + + assert await mctpd_mctp_endpoint_control_obj(dbus, "/au/com/codeconstruct/mctp1/networks/1/endpoints/66") + + # here, assume network is reset and bus owner reset our EID + bo.network_is_down = True + + # force set our EID, expect EID 66 is gone + await bo.send_control(mctpd.network.mctp_socket, MCTPControlCommand(True, 0, 0x01, bytes([0x01, 0x09]))) + + with pytest.raises(asyncdbus.errors.DBusError) as ex: + await mctpd_mctp_endpoint_control_obj(dbus, "/au/com/codeconstruct/mctp1/networks/1/endpoints/66") + + assert str(ex.value) == "Unknown object '/au/com/codeconstruct/mctp1/networks/1/endpoints/66'." + + # bus owner finished assigning all EIDs, network is up and routing table is ok again + # expect EID 66 is live + bo.network_is_down = False + with trio.move_on_after(5) as expected: + await bo.sem.acquire() + + await trio.sleep(5) + + assert await mctpd_mctp_endpoint_control_obj(dbus, "/au/com/codeconstruct/mctp1/networks/1/endpoints/66")