File

src/lib/services/adapters/xmpp/xmpp-chat-connection.service.ts

Description

Implementation of the XMPP specification according to RFC 6121. See https://xmpp.org/rfcs/rfc6121.html See https://xmpp.org/rfcs/rfc3920.html See https://xmpp.org/rfcs/rfc3921.html

Index

Properties
Methods

Constructor

constructor(logService: LogService, ngZone: NgZone, xmppClientFactoryService: XmppClientFactoryService)
Parameters :
Name Type Optional
logService LogService No
ngZone NgZone No
xmppClientFactoryService XmppClientFactoryService No

Methods

getNextRequestId
getNextRequestId()
Returns : string
Private isIqStanzaResponse
isIqStanzaResponse(stanza: Stanza)
Parameters :
Name Type Optional
stanza Stanza No
Returns : IqResponseStanza
Async logIn
logIn(logInRequest: LogInRequest)
Parameters :
Name Type Optional
logInRequest LogInRequest No
Returns : Promise<void>
Async logOut
logOut()
Returns : Promise<void>
Private onOffline
onOffline()
Returns : void
Public onOnline
onOnline(jid: JID)
Parameters :
Name Type Optional
jid JID No
Returns : void
Public onStanzaReceived
onStanzaReceived(stanza: Stanza)
Parameters :
Name Type Optional
stanza Stanza No
Returns : void
reconnectSilently
reconnectSilently()
Returns : void
Public Async send
send(content: any)
Parameters :
Name Type Optional
content any No
Returns : Promise<void>
Public sendAwaitingResponse
sendAwaitingResponse(request: Element)
Parameters :
Name Type Optional
request Element No
Returns : Promise<Stanza>
Public Async sendIq
sendIq(request: Element)
Parameters :
Name Type Optional
request Element No
Returns : Promise<IqResponseStanza<result>>
Public Async sendIqAckResult
sendIqAckResult(id: string)
Parameters :
Name Type Optional
id string No
Returns : Promise<void>
Public Async sendPresence
sendPresence()
Returns : Promise<void>
Private skipXmppClientResponses
skipXmppClientResponses(stanza: Stanza)

We should skip our iq handling for the following xmpp/client response:

Parameters :
Name Type Optional
stanza Stanza No
Returns : boolean

Properties

Public Optional client
Type : Client
Private requestId
Default value : new Date().getTime()
Private Readonly stanzaResponseHandlers
Default value : new Map<string, [(stanza: Stanza) => void, (e: Error) => void]>()
Public Readonly stanzaUnknown$
Default value : new Subject<Stanza>()
Public Readonly state$
Default value : new BehaviorSubject<XmppChatStates>('disconnected')
Public Optional userJid
Type : JID

User JID with resource, not bare.

import { Injectable, NgZone } from '@angular/core';
import { Client, xml } from '@xmpp/client';
import { JID } from '@xmpp/jid';
import { Element } from 'ltx';
import { BehaviorSubject, Subject } from 'rxjs';
import { LogInRequest } from '../../../core/log-in-request';
import { IqResponseStanza, Stanza } from '../../../core/stanza';
import { LogService } from '../../log.service';
import { XmppResponseError } from './xmpp-response.error';
import { XmppClientFactoryService } from './xmpp-client-factory.service';

export type XmppChatStates = 'disconnected' | 'online' | 'reconnecting';

/**
 * Implementation of the XMPP specification according to RFC 6121.
 * @see https://xmpp.org/rfcs/rfc6121.html
 * @see https://xmpp.org/rfcs/rfc3920.html
 * @see https://xmpp.org/rfcs/rfc3921.html
 */
@Injectable()
export class XmppChatConnectionService {

    public readonly state$ = new BehaviorSubject<XmppChatStates>('disconnected');
    public readonly stanzaUnknown$ = new Subject<Stanza>();

    /**
     * User JID with resource, not bare.
     */
    public userJid?: JID;
    private requestId = new Date().getTime();
    private readonly stanzaResponseHandlers = new Map<string, [(stanza: Stanza) => void, (e: Error) => void]>();
    public client?: Client;

    constructor(
        private readonly logService: LogService,
        private readonly ngZone: NgZone,
        private readonly xmppClientFactoryService: XmppClientFactoryService,
    ) {}

    public onOnline(jid: JID): void {
        this.logService.info('online =', 'online as', jid.toString());
        this.userJid = jid;
        this.state$.next('online');
    }

    private onOffline(): void {
        this.stanzaResponseHandlers.forEach(([, reject]) => reject(new Error('offline')));
        this.stanzaResponseHandlers.clear();
    }

    public async sendPresence(): Promise<void> {
        await this.send(
            xml('presence'),
        );
    }

    public async send(content: any): Promise<void> {
        this.logService.debug('>>>', content);
        await this.client.send(content);
    }

    public sendAwaitingResponse(request: Element): Promise<Stanza> {
        return new Promise((resolve, reject) => {
            request.attrs = {
                id: this.getNextRequestId(),
                from: this.userJid.toString(),
                ...request.attrs,
            };
            const {id} = request.attrs;

            this.stanzaResponseHandlers.set(id, [
                (response) => {
                    if (response.attrs.type === 'error') {
                        reject(new XmppResponseError(response));
                        return;
                    }

                    resolve(response);
                },
                reject,
            ]);

            this.send(request).catch((e: unknown) => {
                this.logService.error('error sending stanza', e);
                this.stanzaResponseHandlers.delete(id);
                reject(e);
            });
        });
    }

    public onStanzaReceived(stanza: Stanza): void {
        let handled = false;

        const [handleResponse] = this.stanzaResponseHandlers.get(stanza.attrs.id) ?? [];
        if (handleResponse) {
            this.logService.debug('<<<', stanza.toString(), 'handled by response handler');
            this.stanzaResponseHandlers.delete(stanza.attrs.id);
            handleResponse(stanza);
            handled = true;
        }

        if (!handled) {
            this.stanzaUnknown$.next(stanza);
        }
    }

    public async sendIq(request: Element): Promise<IqResponseStanza<'result'>> {
        const requestType: string | undefined = request.attrs.type;
        // see https://datatracker.ietf.org/doc/html/draft-ietf-xmpp-3920bis#section-8.2.3
        if (!requestType || (requestType !== 'get' && requestType !== 'set')) {
            const message = `iq stanza without type: ${request.toString()}`;
            this.logService.error(message);
            throw new Error(message);
        }

        const response = await this.sendAwaitingResponse(request);
        if (!this.isIqStanzaResponse(response)) {
            throw new Error(`received unexpected stanza as iq response: type=${response.attrs.type}, stanza=${response.toString()}`);
        }
        return response as IqResponseStanza<'result'>;
    }

    private isIqStanzaResponse(stanza: Stanza): stanza is IqResponseStanza {
        const stanzaType = stanza.attrs.type;
        return stanza.name === 'iq' && (stanzaType === 'result' || stanzaType === 'error');
    }

    public async sendIqAckResult(id: string): Promise<void> {
        await this.send(
            xml('iq', {from: this.userJid.toString(), id, type: 'result'}),
        );
    }

    async logIn(logInRequest: LogInRequest): Promise<void> {
        await this.ngZone.runOutsideAngular(async () => {
            if (logInRequest.username.indexOf('@') >= 0) {
                this.logService.warn('username should not contain domain, only local part, this can lead to errors!');
            }

            this.client = this.xmppClientFactoryService.client(logInRequest);

            this.client.on('error', (err: any) => {
                this.ngZone.run(() => {
                    this.logService.error('chat service error =>', err.toString(), err);
                });
            });

            this.client.on('status', (status: any, value: any) => {
                this.ngZone.run(() => {
                    this.logService.info('status update =', status, value ? JSON.stringify(value) : '');
                    if (status === 'offline') {
                        this.state$.next('disconnected');
                    }
                });
            });

            this.client.on('online', (jid: JID) => {
                return this.ngZone.run(() => {
                    return this.onOnline(jid);
                });
            });

            this.client.on('stanza', (stanza: Stanza) => {
                this.ngZone.run(() => {
                    if (this.skipXmppClientResponses(stanza)) {
                        return;
                    }
                    this.onStanzaReceived(stanza);
                });
            });

            this.client.on('disconnect', () => {
                this.ngZone.run(() => {
                    this.state$.next('reconnecting');
                });
            });

            this.client.on('offline', () => {
                this.ngZone.run(() => {
                    this.onOffline();
                });
            });

            await this.client.start();
        });
    }

    /**
     * We should skip our iq handling for the following xmpp/client response:
     * - resource bind on start by https://xmpp.org/rfcs/rfc6120.html#bind
     */
    private skipXmppClientResponses(stanza: Stanza) {
        const xmppBindNS = 'urn:ietf:params:xml:ns:xmpp-bind';
        return stanza.getChild('bind')?.getNS() === xmppBindNS;
    }

    async logOut(): Promise<void> {
        // TODO: move this to a presence plugin in a handler
        this.logService.debug('logging out');
        if (this.client) {
            this.client.reconnect.stop();
            try {
                await this.send(xml('presence', {type: 'unavailable'}));
            } catch (e) {
                this.logService.error('error sending presence unavailable');
            } finally {
                this.client.stop();
            }
        }
    }

    getNextRequestId(): string {
        return String(this.requestId++);
    }

    reconnectSilently(): void {
        this.logService.warn('hard reconnect...');
        this.state$.next('disconnected');
    }
}

results matching ""

    No results matching ""