BaseGraphQLWSHandler
Constructor:
Signature:
def __init__( self, schema: BaseSchema, debug: bool, keep_alive: bool, keep_alive_interval: float,) -> None: ...
Parameters:
-
schema:
- Type
-
BaseSchema
-
debug:
- Type
-
bool
-
keep_alive:
- Type
-
bool
-
keep_alive_interval:
- Type
-
float
Methods:
-
get_context
Return the operations context.
Signature:
def get_context(self) -> Any:... -
get_root_value
Return the schemas root value.
Signature:
def get_root_value(self) -> Any:... -
send_json
Send the data JSON encoded to the WebSocket client.
Signature:
def send_json(self, data: OperationMessage) -> None:...Parameters:
-
data:
- Type
-
OperationMessage
-
-
close
Close the WebSocket with the passed code and reason.
Signature:
def close(self, code: int = 1000, reason: str | None = None) -> None:...Parameters:
-
code:
- Type
-
int
- Default
-
1000
-
reason:
- Type
-
str | None
- Default
-
None
-
-
handle_request
Handle the request this instance was created for.
Signature:
def handle_request(self) -> Any:... -
handle
Signature:
def handle(self) -> Any:... -
handle_message
Signature:
def handle_message(self, message: OperationMessage) -> None:...Parameters:
-
message:
- Type
-
OperationMessage
-
-
handle_connection_init
Signature:
def handle_connection_init(self, message: OperationMessage) -> None:...Parameters:
-
message:
- Type
-
OperationMessage
-
-
handle_connection_terminate
Signature:
def handle_connection_terminate(self, message: OperationMessage) -> None:...Parameters:
-
message:
- Type
-
OperationMessage
-
-
handle_start
Signature:
def handle_start(self, message: OperationMessage) -> None:...Parameters:
-
message:
- Type
-
OperationMessage
-
-
handle_stop
Signature:
def handle_stop(self, message: OperationMessage) -> None:...Parameters:
-
message:
- Type
-
OperationMessage
-
-
handle_keep_alive
Signature:
def handle_keep_alive(self) -> None:... -
handle_async_results
Signature:
def handle_async_results(self, result_source: AsyncGenerator, operation_id: str) -> None:...Parameters:
-
result_source:
- Type
-
AsyncGenerator
-
operation_id:
- Type
-
str
-
-
cleanup_operation
Signature:
def cleanup_operation(self, operation_id: str) -> None:...Parameters:
-
operation_id:
- Type
-
str
-
-
send_message
Signature:
def send_message(self,type_: str,operation_id: str,payload: OperationMessagePayload | None = None,) -> None:...Parameters:
-
type_:
- Type
-
str
-
operation_id:
- Type
-
str
-
payload:
- Type
-
OperationMessagePayload | None
- Default
-
None
-
Attributes:
-
schema:
-
debug:
-
keep_alive:
-
keep_alive_interval:
-
keep_alive_task:
- Type
-
asyncio.Task | None
-
subscriptions:
- Type
-
Dict[str, AsyncGenerator]
-
tasks:
- Type
-
Dict[str, asyncio.Task]
-
connection_params:
- Type
-
ConnectionInitPayload | None