GraphQLWebsocketCommunicator
A test communicator for GraphQL over Websockets.
import pytestfrom strawberry.channels.testing import GraphQLWebsocketCommunicatorfrom myapp.asgi import application
@pytest.fixtureasync def gql_communicator(): async with GraphQLWebsocketCommunicator(application, path="/graphql") as client: yield client
async def test_subscribe_echo(gql_communicator): async for res in gql_communicator.subscribe( query='subscription { echo(message: "Hi") }' ): assert res.data == {"echo": "Hi"}
Constructor:
Create a new communicator.
Signature:
def __init__( self, application: ASGIApplication, path: str, headers: List[Tuple[bytes, bytes]] | None = None, protocol: str = GRAPHQL_TRANSPORT_WS_PROTOCOL, connection_params: dict = ExprDict, kwargs: Any = {},) -> None: ...
Parameters:
-
application:
Your asgi application that encapsulates the strawberry schema.
- Type
-
ASGIApplication
-
path:
the url endpoint for the schema.
- Type
-
str
-
headers:
a list of tuples to be sent as headers to the server.
- Type
-
List[Tuple[bytes, bytes]] | None
- Default
-
None
-
protocol:
currently this supports
graphql-transport-ws
only.- Type
-
str
- Default
-
GRAPHQL_TRANSPORT_WS_PROTOCOL
-
connection_params:
a dictionary of connection parameters to send to the server.
- Type
-
dict
- Default
-
ExprDict
-
kwargs:
- Type
-
Any
- Default
-
{}
Methods:
-
gql_init
Signature:
def gql_init(self) -> None:... -
subscribe
Signature:
def subscribe(self, query: str, variables: Dict | None = None) -> ExecutionResult | AsyncIterator[ExecutionResult]:...Parameters:
-
query:
- Type
-
str
-
variables:
- Type
-
Dict | None
- Default
-
None
-
-
process_errors
Reconstructs a GraphQLError from a FormattedGraphQLError.
Signature:
def process_errors(self, errors: List[GraphQLFormattedError]) -> List[GraphQLError]:...Parameters:
-
errors:
- Type
-
List[GraphQLFormattedError]
-
Attributes:
-
protocol:
-
connection_params: