GraphQLWebsocketCommunicator
A test communicator for GraphQL over Websockets.
import pytestfrom strawberry.channels.testing import GraphQLWebsocketCommunicatorfrom myapp.asgi import application
@pytest.fixtureasync def gql_communicator():    async with GraphQLWebsocketCommunicator(application, path="/graphql") as client:        yield client
async def test_subscribe_echo(gql_communicator):    async for res in gql_communicator.subscribe(        query='subscription { echo(message: "Hi") }'    ):        assert res.data == {"echo": "Hi"}Constructor:
Create a new communicator.
Signature:
def __init__(    self,    application: ASGIApplication,    path: str,    headers: List[Tuple[bytes, bytes]] | None = None,    protocol: str = GRAPHQL_TRANSPORT_WS_PROTOCOL,    connection_params: dict = ExprDict,    kwargs: Any = {},) -> None:  ...Parameters:
-  
application:Your asgi application that encapsulates the strawberry schema.
- Type
 -  
ASGIApplication 
 -  
path:the url endpoint for the schema.
- Type
 -  
str 
 -  
headers:a list of tuples to be sent as headers to the server.
- Type
 -  
List[Tuple[bytes, bytes]] | None - Default
 -  
None 
 -  
protocol:currently this supports
graphql-transport-wsonly.- Type
 -  
str - Default
 -  
GRAPHQL_TRANSPORT_WS_PROTOCOL 
 -  
connection_params:a dictionary of connection parameters to send to the server.
- Type
 -  
dict - Default
 -  
ExprDict 
 -  
kwargs:- Type
 -  
Any - Default
 -  
{} 
 
Methods:
-  
gql_init
Signature:
def gql_init(self) -> None:... -  
subscribe
Signature:
def subscribe(self, query: str, variables: Dict | None = None) -> ExecutionResult | AsyncIterator[ExecutionResult]:...Parameters:
-  
query:- Type
 -  
str 
 -  
variables:- Type
 -  
Dict | None - Default
 -  
None 
 
 -  
 -  
process_errors
Reconstructs a GraphQLError from a FormattedGraphQLError.
Signature:
def process_errors(self, errors: List[GraphQLFormattedError]) -> List[GraphQLError]:...Parameters:
-  
errors:- Type
 -  
List[GraphQLFormattedError] 
 
 -  
 
Attributes:
-  
protocol: -  
connection_params: