Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions packages/pynumaflow/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ setup:
poetry install --with dev --no-root

proto:
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/sinker=pynumaflow/proto/sinker --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/sinker/*.proto
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/mapper=pynumaflow/proto/mapper --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/mapper/*.proto
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/reducer=pynumaflow/proto/reducer --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/reducer/*.proto
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/sourcetransformer=pynumaflow/proto/sourcetransformer --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/sourcetransformer/*.proto
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/sideinput=pynumaflow/proto/sideinput --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/sideinput/*.proto
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/sourcer=pynumaflow/proto/sourcer --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/sourcer/*.proto
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/accumulator=pynumaflow/proto/accumulator --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/accumulator/*.proto
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/common=pynumaflow/proto/common --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/common/*.proto
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/sinker=pynumaflow/proto/sinker -Ipynumaflow/proto/common=pynumaflow/proto/common --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/sinker/*.proto
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/mapper=pynumaflow/proto/mapper -Ipynumaflow/proto/common=pynumaflow/proto/common --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/mapper/*.proto
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/reducer=pynumaflow/proto/reducer -Ipynumaflow/proto/common=pynumaflow/proto/common --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/reducer/*.proto
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/sourcetransformer=pynumaflow/proto/sourcetransformer -Ipynumaflow/proto/common=pynumaflow/proto/common --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/sourcetransformer/*.proto
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/sideinput=pynumaflow/proto/sideinput -Ipynumaflow/proto/common=pynumaflow/proto/common --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/sideinput/*.proto
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/sourcer=pynumaflow/proto/sourcer -Ipynumaflow/proto/common=pynumaflow/proto/common --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/sourcer/*.proto
poetry run python3 -m grpc_tools.protoc -Ipynumaflow/proto/accumulator=pynumaflow/proto/accumulator -Ipynumaflow/proto/common=pynumaflow/proto/common --pyi_out=. --python_out=. --grpc_python_out=. pynumaflow/proto/accumulator/*.proto
104 changes: 102 additions & 2 deletions packages/pynumaflow/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

171 changes: 171 additions & 0 deletions packages/pynumaflow/pynumaflow/_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
from dataclasses import dataclass, field
from typing import Optional
from pynumaflow.proto.common import metadata_pb2

"""
Metadata provides per-message metadata passed between vertices.

A vertex could create one or more set of key-value pairs per group-name.
This is required because a vertex could forward a message to a
Kafka sink with Kafka headers, and a metrics Sink with some key/value points.

There will be two kinds of metadata,

system - generated by the system, exposed as read-only to UDFs
user - user generated with read-write access
|
| +-> [group-m] -> {k1:v1, ... }
| |
+-> [user] +-> [group-n] -> {k1:v1, ... }
| |
| +-> [group-o] -> {k1:v1, ... }
|
| +-> [group-h] -> {k1:v1, ... }
| |
+-> [sys] +-> [group-i] -> {k1:v1, ... }
|
"""


@dataclass
class SystemMetadata:
"""
System metadata is the mapping of group name to key-value pairs for a given group.
System metadata wraps the system-generated metadata groups per message. It is read-only to UDFs.
"""

_data: dict[str, dict[str, bytes]] = field(default_factory=dict)

def groups(self) -> list[str]:
"""
Returns the list of group names for the system metadata.
"""
return list(self._data.keys())

def keys(self, group: str) -> list[str]:
"""
Returns the list of keys for a given group.
"""
return list(self._data.get(group, {}).keys())

def value(self, group: str, key: str) -> Optional[bytes]:
"""
Returns the value for a given group and key.
"""
return self._data.get(group, {}).get(key)


@dataclass
class UserMetadata:
"""
UserMetadata wraps the user-generated metadata groups per message. It is read-write to UDFs.
"""

_data: dict[str, dict[str, bytes]] = field(default_factory=dict)

def groups(self) -> list[str]:
"""
Returns the list of group names for the user metadata.
"""
return list(self._data.keys())

def keys(self, group: str) -> list[str]:
"""
Returns the list of keys for a given group.
"""
keys = self._data.get(group) or {}
return list(keys.keys())

def __contains__(self, group: str) -> bool:
"""
Returns True if the group exists.
"""
return group in self._data

def __getitem__(self, group: str) -> dict[str, bytes]:
"""
Returns the data for a given group.
Raises KeyError if the group does not exist.
"""
return self._data[group]

def __setitem__(self, group: str, data: dict[str, bytes]):
"""
Sets the data for a given group.
"""
self._data[group] = data

def __delitem__(self, group: str):
"""
Removes the group and all its keys and values.
Raises KeyError if the group does not exist.
"""
del self._data[group]

def __len__(self) -> int:
"""
Returns the number of groups.
"""
return len(self._data)

def value(self, group: str, key: str) -> Optional[bytes]:
"""
Returns the value for a given group and key.
If the group or key does not exist, returns None.
"""
value = self._data.get(group)
if value is None:
return None
return value.get(key)

def add_key(self, group: str, key: str, value: bytes):
"""
Adds the value for a given group and key.
"""
self._data.setdefault(group, {})[key] = value

def remove_key(self, group: str, key: str) -> Optional[bytes]:
"""
Removes the key and its value for a given group and returns the value.
If this key is the only key in the group, the group will be removed.
Returns None if the group or key does not exist.
"""
group_data = self._data.pop(group, None)
if group_data is None:
return None
value = group_data.pop(key, None)
if group_data:
self._data[group] = group_data
return value

def remove_group(self, group: str) -> Optional[dict[str, bytes]]:
"""
Removes the group and all its keys and values and returns the data.
Returns None if the group does not exist.
"""
return self._data.pop(group, None)

def clear(self):
"""
Clears all the groups and all their keys and values.
"""
self._data.clear()

def _to_proto(self) -> metadata_pb2.Metadata:
return metadata_pb2.Metadata(
user_metadata={
group: metadata_pb2.KeyValueGroup(key_value=value)
for group, value in self._data.items()
},
)


def _user_and_system_metadata_from_proto(
proto: metadata_pb2.Metadata,
) -> tuple[UserMetadata, SystemMetadata]:
"""
Converts the protobuf metadata to the UserMetadata and SystemMetadata objects.
"""
user_metadata = {group: dict(kv.key_value) for group, kv in proto.user_metadata.items()}
system_metadata = {group: dict(kv.key_value) for group, kv in proto.sys_metadata.items()}
return UserMetadata(user_metadata), SystemMetadata(system_metadata)
3 changes: 3 additions & 0 deletions packages/pynumaflow/pynumaflow/mapper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pynumaflow.mapper.sync_server import MapServer

from pynumaflow.mapper._dtypes import Message, Messages, Datum, DROP, Mapper
from pynumaflow._metadata import UserMetadata, SystemMetadata

__all__ = [
"Message",
Expand All @@ -13,4 +14,6 @@
"MapServer",
"MapAsyncServer",
"MapMultiprocServer",
"UserMetadata",
"SystemMetadata",
]
Loading